You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

577 lines
21KB

  1. package yggdrasil
  2. import (
  3. "encoding/hex"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "sort"
  8. "time"
  9. "github.com/gologme/log"
  10. "github.com/yggdrasil-network/yggdrasil-go/src/address"
  11. "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
  12. "github.com/Arceliar/phony"
  13. )
  14. // Peer represents a single peer object. This contains information from the
  15. // preferred switch port for this peer, although there may be more than one
  16. // active switch port connection to the peer in reality.
  17. //
  18. // This struct is informational only - you cannot manipulate peer connections
  19. // using instances of this struct. You should use the AddPeer or RemovePeer
  20. // functions instead.
  21. type Peer struct {
  22. PublicKey crypto.BoxPubKey // The public key of the remote node
  23. Endpoint string // The connection string used to connect to the peer
  24. BytesSent uint64 // Number of bytes sent to this peer
  25. BytesRecvd uint64 // Number of bytes received from this peer
  26. Protocol string // The transport protocol that this peer is connected with, typically "tcp"
  27. Port uint64 // Switch port number for this peer connection
  28. Uptime time.Duration // How long this peering has been active for
  29. }
  30. // SwitchPeer represents a switch connection to a peer. Note that there may be
  31. // multiple switch peers per actual peer, e.g. if there are multiple connections
  32. // to a given node.
  33. //
  34. // This struct is informational only - you cannot manipulate switch peer
  35. // connections using instances of this struct. You should use the AddPeer or
  36. // RemovePeer functions instead.
  37. type SwitchPeer struct {
  38. PublicKey crypto.BoxPubKey // The public key of the remote node
  39. Coords []uint64 // The coordinates of the remote node
  40. BytesSent uint64 // Number of bytes sent via this switch port
  41. BytesRecvd uint64 // Number of bytes received via this switch port
  42. Port uint64 // Switch port number for this switch peer
  43. Protocol string // The transport protocol that this switch port is connected with, typically "tcp"
  44. Endpoint string // The connection string used to connect to the switch peer
  45. }
  46. // DHTEntry represents a single DHT entry that has been learned or cached from
  47. // DHT searches.
  48. type DHTEntry struct {
  49. PublicKey crypto.BoxPubKey
  50. Coords []uint64
  51. LastSeen time.Duration
  52. }
  53. // DHTRes represents a DHT response, as returned by DHTPing.
  54. type DHTRes struct {
  55. PublicKey crypto.BoxPubKey // key of the sender
  56. Coords []uint64 // coords of the sender
  57. Dest crypto.NodeID // the destination node ID
  58. Infos []DHTEntry // response
  59. }
  60. // NodeInfoPayload represents a RequestNodeInfo response, in bytes.
  61. type NodeInfoPayload []byte
  62. // SwitchQueues represents information from the switch related to link
  63. // congestion and a list of switch queues created in response to congestion on a
  64. // given link.
  65. type SwitchQueues struct {
  66. Queues []SwitchQueue // An array of SwitchQueue objects containing information about individual queues
  67. Count uint64 // The current number of active switch queues
  68. Size uint64 // The current total size of active switch queues
  69. HighestCount uint64 // The highest recorded number of switch queues so far
  70. HighestSize uint64 // The highest recorded total size of switch queues so far
  71. MaximumSize uint64 // The maximum allowed total size of switch queues, as specified by config
  72. }
  73. // SwitchQueue represents a single switch queue. Switch queues are only created
  74. // in response to congestion on a given link and represent how much data has
  75. // been temporarily cached for sending once the congestion has cleared.
  76. type SwitchQueue struct {
  77. ID string // The ID of the switch queue
  78. Size uint64 // The total size, in bytes, of the queue
  79. Packets uint64 // The number of packets in the queue
  80. Port uint64 // The switch port to which the queue applies
  81. }
  82. // Session represents an open session with another node. Sessions are opened in
  83. // response to traffic being exchanged between two nodes using Conn objects.
  84. // Note that sessions will automatically be closed by Yggdrasil if no traffic is
  85. // exchanged for around two minutes.
  86. type Session struct {
  87. PublicKey crypto.BoxPubKey // The public key of the remote node
  88. Coords []uint64 // The coordinates of the remote node
  89. BytesSent uint64 // Bytes sent to the session
  90. BytesRecvd uint64 // Bytes received from the session
  91. MTU MTU // The maximum supported message size of the session
  92. Uptime time.Duration // How long this session has been active for
  93. WasMTUFixed bool // This field is no longer used
  94. }
  95. // GetPeers returns one or more Peer objects containing information about active
  96. // peerings with other Yggdrasil nodes, where one of the responses always
  97. // includes information about the current node (with a port number of 0). If
  98. // there is exactly one entry then this node is not connected to any other nodes
  99. // and is therefore isolated.
  100. func (c *Core) GetPeers() []Peer {
  101. ports := c.peers.ports.Load().(map[switchPort]*peer)
  102. var peers []Peer
  103. var ps []switchPort
  104. for port := range ports {
  105. ps = append(ps, port)
  106. }
  107. sort.Slice(ps, func(i, j int) bool { return ps[i] < ps[j] })
  108. for _, port := range ps {
  109. p := ports[port]
  110. var info Peer
  111. phony.Block(p, func() {
  112. info = Peer{
  113. Endpoint: p.intf.name,
  114. BytesSent: p.bytesSent,
  115. BytesRecvd: p.bytesRecvd,
  116. Protocol: p.intf.info.linkType,
  117. Port: uint64(port),
  118. Uptime: time.Since(p.firstSeen),
  119. }
  120. copy(info.PublicKey[:], p.box[:])
  121. })
  122. peers = append(peers, info)
  123. }
  124. return peers
  125. }
  126. // GetSwitchPeers returns zero or more SwitchPeer objects containing information
  127. // about switch port connections with other Yggdrasil nodes. Note that, unlike
  128. // GetPeers, GetSwitchPeers does not include information about the current node,
  129. // therefore it is possible for this to return zero elements if the node is
  130. // isolated or not connected to any peers.
  131. func (c *Core) GetSwitchPeers() []SwitchPeer {
  132. var switchpeers []SwitchPeer
  133. table := c.switchTable.table.Load().(lookupTable)
  134. peers := c.peers.ports.Load().(map[switchPort]*peer)
  135. for _, elem := range table.elems {
  136. peer, isIn := peers[elem.port]
  137. if !isIn {
  138. continue
  139. }
  140. coords := elem.locator.getCoords()
  141. var info SwitchPeer
  142. phony.Block(peer, func() {
  143. info = SwitchPeer{
  144. Coords: append([]uint64{}, wire_coordsBytestoUint64s(coords)...),
  145. BytesSent: peer.bytesSent,
  146. BytesRecvd: peer.bytesRecvd,
  147. Port: uint64(elem.port),
  148. Protocol: peer.intf.info.linkType,
  149. Endpoint: peer.intf.info.remote,
  150. }
  151. copy(info.PublicKey[:], peer.box[:])
  152. })
  153. switchpeers = append(switchpeers, info)
  154. }
  155. return switchpeers
  156. }
  157. // GetDHT returns zero or more entries as stored in the DHT, cached primarily
  158. // from searches that have already taken place.
  159. func (c *Core) GetDHT() []DHTEntry {
  160. var dhtentries []DHTEntry
  161. getDHT := func() {
  162. now := time.Now()
  163. var dhtentry []*dhtInfo
  164. for _, v := range c.router.dht.table {
  165. dhtentry = append(dhtentry, v)
  166. }
  167. sort.SliceStable(dhtentry, func(i, j int) bool {
  168. return dht_ordered(&c.router.dht.nodeID, dhtentry[i].getNodeID(), dhtentry[j].getNodeID())
  169. })
  170. for _, v := range dhtentry {
  171. info := DHTEntry{
  172. Coords: append([]uint64{}, wire_coordsBytestoUint64s(v.coords)...),
  173. LastSeen: now.Sub(v.recv),
  174. }
  175. copy(info.PublicKey[:], v.key[:])
  176. dhtentries = append(dhtentries, info)
  177. }
  178. }
  179. phony.Block(&c.router, getDHT)
  180. return dhtentries
  181. }
  182. // GetSwitchQueues returns information about the switch queues that are
  183. // currently in effect. These values can change within an instant.
  184. func (c *Core) GetSwitchQueues() SwitchQueues {
  185. var switchqueues SwitchQueues
  186. switchTable := &c.switchTable
  187. getSwitchQueues := func() {
  188. switchqueues = SwitchQueues{
  189. Count: uint64(len(switchTable.queues.bufs)),
  190. Size: switchTable.queues.size,
  191. HighestCount: uint64(switchTable.queues.maxbufs),
  192. HighestSize: switchTable.queues.maxsize,
  193. MaximumSize: switchTable.queues.totalMaxSize,
  194. }
  195. for k, v := range switchTable.queues.bufs {
  196. nexthop := switchTable.bestPortForCoords([]byte(k))
  197. queue := SwitchQueue{
  198. ID: k,
  199. Size: v.size,
  200. Packets: uint64(len(v.packets)),
  201. Port: uint64(nexthop),
  202. }
  203. switchqueues.Queues = append(switchqueues.Queues, queue)
  204. }
  205. }
  206. phony.Block(&c.switchTable, getSwitchQueues)
  207. return switchqueues
  208. }
  209. // GetSessions returns a list of open sessions from this node to other nodes.
  210. func (c *Core) GetSessions() []Session {
  211. var sessions []Session
  212. getSessions := func() {
  213. for _, sinfo := range c.router.sessions.sinfos {
  214. var session Session
  215. workerFunc := func() {
  216. session = Session{
  217. Coords: append([]uint64{}, wire_coordsBytestoUint64s(sinfo.coords)...),
  218. MTU: sinfo._getMTU(),
  219. BytesSent: sinfo.bytesSent,
  220. BytesRecvd: sinfo.bytesRecvd,
  221. Uptime: time.Now().Sub(sinfo.timeOpened),
  222. WasMTUFixed: sinfo.wasMTUFixed,
  223. }
  224. copy(session.PublicKey[:], sinfo.theirPermPub[:])
  225. }
  226. phony.Block(sinfo, workerFunc)
  227. // TODO? skipped known but timed out sessions?
  228. sessions = append(sessions, session)
  229. }
  230. }
  231. phony.Block(&c.router, getSessions)
  232. return sessions
  233. }
  234. // ConnListen returns a listener for Yggdrasil session connections. You can only
  235. // call this function once as each Yggdrasil node can only have a single
  236. // ConnListener. Make sure to keep the reference to this for as long as it is
  237. // needed.
  238. func (c *Core) ConnListen() (*Listener, error) {
  239. c.router.sessions.listenerMutex.Lock()
  240. defer c.router.sessions.listenerMutex.Unlock()
  241. if c.router.sessions.listener != nil {
  242. return nil, errors.New("a listener already exists")
  243. }
  244. c.router.sessions.listener = &Listener{
  245. core: c,
  246. conn: make(chan *Conn),
  247. close: make(chan interface{}),
  248. }
  249. return c.router.sessions.listener, nil
  250. }
  251. // ConnDialer returns a dialer for Yggdrasil session connections. Since
  252. // ConnDialers are stateless, you can request as many dialers as you like,
  253. // although ideally you should request only one and keep the reference to it for
  254. // as long as it is needed.
  255. func (c *Core) ConnDialer() (*Dialer, error) {
  256. return &Dialer{
  257. core: c,
  258. }, nil
  259. }
  260. // ListenTCP starts a new TCP listener. The input URI should match that of the
  261. // "Listen" configuration item, e.g.
  262. // tcp://a.b.c.d:e
  263. func (c *Core) ListenTCP(uri string) (*TcpListener, error) {
  264. return c.link.tcp.listen(uri, nil)
  265. }
  266. // ListenTLS starts a new TLS listener. The input URI should match that of the
  267. // "Listen" configuration item, e.g.
  268. // tls://a.b.c.d:e
  269. func (c *Core) ListenTLS(uri string) (*TcpListener, error) {
  270. return c.link.tcp.listen(uri, c.link.tcp.tls.forListener)
  271. }
  272. // NodeID gets the node ID. This is derived from your router encryption keys.
  273. // Remote nodes wanting to open connections to your node will need to know your
  274. // node ID.
  275. func (c *Core) NodeID() *crypto.NodeID {
  276. return crypto.GetNodeID(&c.boxPub)
  277. }
  278. // TreeID gets the tree ID. This is derived from your switch signing keys. There
  279. // is typically no need to share this key.
  280. func (c *Core) TreeID() *crypto.TreeID {
  281. return crypto.GetTreeID(&c.sigPub)
  282. }
  283. // SigningPublicKey gets the node's signing public key, as used by the switch.
  284. func (c *Core) SigningPublicKey() string {
  285. return hex.EncodeToString(c.sigPub[:])
  286. }
  287. // EncryptionPublicKey gets the node's encryption public key, as used by the
  288. // router.
  289. func (c *Core) EncryptionPublicKey() string {
  290. return hex.EncodeToString(c.boxPub[:])
  291. }
  292. // Coords returns the current coordinates of the node. Note that these can
  293. // change at any time for a number of reasons, not limited to but including
  294. // changes to peerings (either yours or a parent nodes) or changes to the network
  295. // root.
  296. //
  297. // This function may return an empty array - this is normal behaviour if either
  298. // you are the root of the network that you are connected to, or you are not
  299. // connected to any other nodes (effectively making you the root of a
  300. // single-node network).
  301. func (c *Core) Coords() []uint64 {
  302. table := c.switchTable.table.Load().(lookupTable)
  303. return wire_coordsBytestoUint64s(table.self.getCoords())
  304. }
  305. // Address gets the IPv6 address of the Yggdrasil node. This is always a /128
  306. // address. The IPv6 address is only relevant when the node is operating as an
  307. // IP router and often is meaningless when embedded into an application, unless
  308. // that application also implements either VPN functionality or deals with IP
  309. // packets specifically.
  310. func (c *Core) Address() net.IP {
  311. address := net.IP(address.AddrForNodeID(c.NodeID())[:])
  312. return address
  313. }
  314. // Subnet gets the routed IPv6 subnet of the Yggdrasil node. This is always a
  315. // /64 subnet. The IPv6 subnet is only relevant when the node is operating as an
  316. // IP router and often is meaningless when embedded into an application, unless
  317. // that application also implements either VPN functionality or deals with IP
  318. // packets specifically.
  319. func (c *Core) Subnet() net.IPNet {
  320. subnet := address.SubnetForNodeID(c.NodeID())[:]
  321. subnet = append(subnet, 0, 0, 0, 0, 0, 0, 0, 0)
  322. return net.IPNet{IP: subnet, Mask: net.CIDRMask(64, 128)}
  323. }
  324. // MyNodeInfo gets the currently configured nodeinfo. NodeInfo is typically
  325. // specified through the "NodeInfo" option in the node configuration or using
  326. // the SetNodeInfo function, although it may also contain other built-in values
  327. // such as "buildname", "buildversion" etc.
  328. func (c *Core) MyNodeInfo() NodeInfoPayload {
  329. return c.router.nodeinfo.getNodeInfo()
  330. }
  331. // SetNodeInfo sets the local nodeinfo. Note that nodeinfo can be any value or
  332. // struct, it will be serialised into JSON automatically.
  333. func (c *Core) SetNodeInfo(nodeinfo interface{}, nodeinfoprivacy bool) {
  334. c.router.nodeinfo.setNodeInfo(nodeinfo, nodeinfoprivacy)
  335. }
  336. // GetMaximumSessionMTU returns the maximum allowed session MTU size.
  337. func (c *Core) GetMaximumSessionMTU() MTU {
  338. var mtu MTU
  339. phony.Block(&c.router, func() {
  340. mtu = c.router.sessions.myMaximumMTU
  341. })
  342. return mtu
  343. }
  344. // SetMaximumSessionMTU sets the maximum allowed session MTU size. The default
  345. // value is 65535 bytes. Session pings will be sent to update all open sessions
  346. // if the MTU has changed.
  347. func (c *Core) SetMaximumSessionMTU(mtu MTU) {
  348. phony.Block(&c.router, func() {
  349. if c.router.sessions.myMaximumMTU != mtu {
  350. c.router.sessions.myMaximumMTU = mtu
  351. c.router.sessions.reconfigure()
  352. }
  353. })
  354. }
  355. // GetNodeInfo requests nodeinfo from a remote node, as specified by the public
  356. // key and coordinates specified. The third parameter specifies whether a cached
  357. // result is acceptable - this results in less traffic being generated than is
  358. // necessary when, e.g. crawling the network.
  359. func (c *Core) GetNodeInfo(key crypto.BoxPubKey, coords []uint64, nocache bool) (NodeInfoPayload, error) {
  360. response := make(chan *NodeInfoPayload, 1)
  361. c.router.nodeinfo.addCallback(key, func(nodeinfo *NodeInfoPayload) {
  362. defer func() { recover() }()
  363. select {
  364. case response <- nodeinfo:
  365. default:
  366. }
  367. })
  368. c.router.nodeinfo.sendNodeInfo(key, wire_coordsUint64stoBytes(coords), false)
  369. phony.Block(&c.router.nodeinfo, func() {}) // Wait for sendNodeInfo before starting timer
  370. timer := time.AfterFunc(6*time.Second, func() { close(response) })
  371. defer timer.Stop()
  372. for res := range response {
  373. return *res, nil
  374. }
  375. return NodeInfoPayload{}, fmt.Errorf("getNodeInfo timeout: %s", hex.EncodeToString(key[:]))
  376. }
  377. // SetSessionGatekeeper allows you to configure a handler function for deciding
  378. // whether a session should be allowed or not. The default session firewall is
  379. // implemented in this way. The function receives the public key of the remote
  380. // side and a boolean which is true if we initiated the session or false if we
  381. // received an incoming session request. The function should return true to
  382. // allow the session or false to reject it.
  383. func (c *Core) SetSessionGatekeeper(f func(pubkey *crypto.BoxPubKey, initiator bool) bool) {
  384. c.router.sessions.isAllowedMutex.Lock()
  385. defer c.router.sessions.isAllowedMutex.Unlock()
  386. c.router.sessions.isAllowedHandler = f
  387. }
  388. // SetLogger sets the output logger of the Yggdrasil node after startup. This
  389. // may be useful if you want to redirect the output later. Note that this
  390. // expects a Logger from the github.com/gologme/log package and not from Go's
  391. // built-in log package.
  392. func (c *Core) SetLogger(log *log.Logger) {
  393. c.log = log
  394. }
  395. // AddPeer adds a peer. This should be specified in the peer URI format, e.g.:
  396. // tcp://a.b.c.d:e
  397. // socks://a.b.c.d:e/f.g.h.i:j
  398. // This adds the peer to the peer list, so that they will be called again if the
  399. // connection drops.
  400. func (c *Core) AddPeer(addr string, sintf string) error {
  401. if err := c.CallPeer(addr, sintf); err != nil {
  402. // TODO: We maybe want this to write the peer to the persistent
  403. // configuration even if a connection attempt fails, but first we'll need to
  404. // move the code to check the peer URI so that we don't deliberately save a
  405. // peer with a known bad URI. Loading peers from config should really do the
  406. // same thing too but I don't think that happens today
  407. return err
  408. }
  409. c.config.Mutex.Lock()
  410. defer c.config.Mutex.Unlock()
  411. if sintf == "" {
  412. for _, peer := range c.config.Current.Peers {
  413. if peer == addr {
  414. return errors.New("peer already added")
  415. }
  416. }
  417. c.config.Current.Peers = append(c.config.Current.Peers, addr)
  418. } else {
  419. if _, ok := c.config.Current.InterfacePeers[sintf]; ok {
  420. for _, peer := range c.config.Current.InterfacePeers[sintf] {
  421. if peer == addr {
  422. return errors.New("peer already added")
  423. }
  424. }
  425. }
  426. if _, ok := c.config.Current.InterfacePeers[sintf]; !ok {
  427. c.config.Current.InterfacePeers[sintf] = []string{addr}
  428. } else {
  429. c.config.Current.InterfacePeers[sintf] = append(c.config.Current.InterfacePeers[sintf], addr)
  430. }
  431. }
  432. return nil
  433. }
  434. func (c *Core) RemovePeer(addr string, sintf string) error {
  435. if sintf == "" {
  436. for i, peer := range c.config.Current.Peers {
  437. if peer == addr {
  438. c.config.Current.Peers = append(c.config.Current.Peers[:i], c.config.Current.Peers[i+1:]...)
  439. break
  440. }
  441. }
  442. } else if _, ok := c.config.Current.InterfacePeers[sintf]; ok {
  443. for i, peer := range c.config.Current.InterfacePeers[sintf] {
  444. if peer == addr {
  445. c.config.Current.InterfacePeers[sintf] = append(c.config.Current.InterfacePeers[sintf][:i], c.config.Current.InterfacePeers[sintf][i+1:]...)
  446. break
  447. }
  448. }
  449. }
  450. ports := c.peers.ports.Load().(map[switchPort]*peer)
  451. for p, peer := range ports {
  452. if addr == peer.intf.name {
  453. c.peers.removePeer(p)
  454. }
  455. }
  456. return nil
  457. }
  458. // CallPeer calls a peer once. This should be specified in the peer URI format,
  459. // e.g.:
  460. // tcp://a.b.c.d:e
  461. // socks://a.b.c.d:e/f.g.h.i:j
  462. // This does not add the peer to the peer list, so if the connection drops, the
  463. // peer will not be called again automatically.
  464. func (c *Core) CallPeer(addr string, sintf string) error {
  465. return c.link.call(addr, sintf)
  466. }
  467. // DisconnectPeer disconnects a peer once. This should be specified as a port
  468. // number.
  469. func (c *Core) DisconnectPeer(port uint64) error {
  470. c.peers.removePeer(switchPort(port))
  471. return nil
  472. }
  473. // GetAllowedEncryptionPublicKeys returns the public keys permitted for incoming
  474. // peer connections. If this list is empty then all incoming peer connections
  475. // are accepted by default.
  476. func (c *Core) GetAllowedEncryptionPublicKeys() []string {
  477. return c.peers.getAllowedEncryptionPublicKeys()
  478. }
  479. // AddAllowedEncryptionPublicKey whitelists a key for incoming peer connections.
  480. // By default all incoming peer connections are accepted, but adding public keys
  481. // to the whitelist using this function enables strict checking from that point
  482. // forward. Once the whitelist is enabled, only peer connections from
  483. // whitelisted public keys will be accepted.
  484. func (c *Core) AddAllowedEncryptionPublicKey(bstr string) (err error) {
  485. c.peers.addAllowedEncryptionPublicKey(bstr)
  486. return nil
  487. }
  488. // RemoveAllowedEncryptionPublicKey removes a key from the whitelist for
  489. // incoming peer connections. If none are set, an empty list permits all
  490. // incoming connections.
  491. func (c *Core) RemoveAllowedEncryptionPublicKey(bstr string) (err error) {
  492. c.peers.removeAllowedEncryptionPublicKey(bstr)
  493. return nil
  494. }
  495. // DHTPing sends a DHT ping to the node with the provided key and coords,
  496. // optionally looking up the specified target NodeID.
  497. func (c *Core) DHTPing(key crypto.BoxPubKey, coords []uint64, target *crypto.NodeID) (DHTRes, error) {
  498. resCh := make(chan *dhtRes, 1)
  499. info := dhtInfo{
  500. key: key,
  501. coords: wire_coordsUint64stoBytes(coords),
  502. }
  503. if target == nil {
  504. target = info.getNodeID()
  505. }
  506. rq := dhtReqKey{info.key, *target}
  507. sendPing := func() {
  508. c.router.dht.addCallback(&rq, func(res *dhtRes) {
  509. resCh <- res
  510. })
  511. c.router.dht.ping(&info, &rq.dest)
  512. }
  513. phony.Block(&c.router, sendPing)
  514. // TODO: do something better than the below...
  515. res := <-resCh
  516. if res != nil {
  517. r := DHTRes{
  518. Coords: append([]uint64{}, wire_coordsBytestoUint64s(res.Coords)...),
  519. }
  520. copy(r.PublicKey[:], res.Key[:])
  521. for _, i := range res.Infos {
  522. e := DHTEntry{
  523. Coords: append([]uint64{}, wire_coordsBytestoUint64s(i.coords)...),
  524. }
  525. copy(e.PublicKey[:], i.key[:])
  526. r.Infos = append(r.Infos, e)
  527. }
  528. return r, nil
  529. }
  530. return DHTRes{}, fmt.Errorf("DHT ping timeout: %s", hex.EncodeToString(key[:]))
  531. }