You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

460 lines
12KB

  1. // +build !lint
  2. package main
  3. import (
  4. "bufio"
  5. "flag"
  6. "fmt"
  7. "os"
  8. "runtime"
  9. "runtime/pprof"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/gologme/log"
  14. . "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
  15. . "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
  16. )
  17. ////////////////////////////////////////////////////////////////////////////////
  18. type Node struct {
  19. index int
  20. core Core
  21. send chan<- []byte
  22. recv <-chan []byte
  23. }
  24. func (n *Node) init(index int) {
  25. n.index = index
  26. n.core.Init()
  27. n.send = n.core.DEBUG_getSend()
  28. n.recv = n.core.DEBUG_getRecv()
  29. n.core.DEBUG_simFixMTU()
  30. }
  31. func (n *Node) printTraffic() {
  32. for {
  33. packet := <-n.recv
  34. fmt.Println(n.index, packet)
  35. //panic("Got a packet")
  36. }
  37. }
  38. func (n *Node) startPeers() {
  39. //for _, p := range n.core.Peers.Ports {
  40. // go p.MainLoop()
  41. //}
  42. //go n.printTraffic()
  43. //n.core.Peers.DEBUG_startPeers()
  44. }
  45. func linkNodes(m, n *Node) {
  46. // Don't allow duplicates
  47. if m.core.DEBUG_getPeers().DEBUG_hasPeer(n.core.DEBUG_getSigningPublicKey()) {
  48. return
  49. }
  50. // Create peers
  51. // Buffering reduces packet loss in the sim
  52. // This slightly speeds up testing (fewer delays before retrying a ping)
  53. pLinkPub, pLinkPriv := m.core.DEBUG_newBoxKeys()
  54. qLinkPub, qLinkPriv := m.core.DEBUG_newBoxKeys()
  55. p := m.core.DEBUG_getPeers().DEBUG_newPeer(n.core.DEBUG_getEncryptionPublicKey(),
  56. n.core.DEBUG_getSigningPublicKey(), *m.core.DEBUG_getSharedKey(pLinkPriv, qLinkPub))
  57. q := n.core.DEBUG_getPeers().DEBUG_newPeer(m.core.DEBUG_getEncryptionPublicKey(),
  58. m.core.DEBUG_getSigningPublicKey(), *n.core.DEBUG_getSharedKey(qLinkPriv, pLinkPub))
  59. DEBUG_simLinkPeers(p, q)
  60. return
  61. }
  62. func makeStoreSquareGrid(sideLength int) map[int]*Node {
  63. store := make(map[int]*Node)
  64. nNodes := sideLength * sideLength
  65. idxs := make([]int, 0, nNodes)
  66. // TODO shuffle nodeIDs
  67. for idx := 1; idx <= nNodes; idx++ {
  68. idxs = append(idxs, idx)
  69. }
  70. for _, idx := range idxs {
  71. node := &Node{}
  72. node.init(idx)
  73. store[idx] = node
  74. }
  75. for idx := 0; idx < nNodes; idx++ {
  76. if (idx % sideLength) != 0 {
  77. linkNodes(store[idxs[idx]], store[idxs[idx-1]])
  78. }
  79. if idx >= sideLength {
  80. linkNodes(store[idxs[idx]], store[idxs[idx-sideLength]])
  81. }
  82. }
  83. //for _, node := range store { node.initPorts() }
  84. return store
  85. }
  86. func makeStoreStar(nNodes int) map[int]*Node {
  87. store := make(map[int]*Node)
  88. center := &Node{}
  89. center.init(0)
  90. store[0] = center
  91. for idx := 1; idx < nNodes; idx++ {
  92. node := &Node{}
  93. node.init(idx)
  94. store[idx] = node
  95. linkNodes(center, node)
  96. }
  97. return store
  98. }
  99. func loadGraph(path string) map[int]*Node {
  100. f, err := os.Open(path)
  101. if err != nil {
  102. panic(err)
  103. }
  104. defer f.Close()
  105. store := make(map[int]*Node)
  106. s := bufio.NewScanner(f)
  107. for s.Scan() {
  108. line := s.Text()
  109. nodeIdxstrs := strings.Split(line, " ")
  110. nodeIdx0, _ := strconv.Atoi(nodeIdxstrs[0])
  111. nodeIdx1, _ := strconv.Atoi(nodeIdxstrs[1])
  112. if store[nodeIdx0] == nil {
  113. node := &Node{}
  114. node.init(nodeIdx0)
  115. store[nodeIdx0] = node
  116. }
  117. if store[nodeIdx1] == nil {
  118. node := &Node{}
  119. node.init(nodeIdx1)
  120. store[nodeIdx1] = node
  121. }
  122. linkNodes(store[nodeIdx0], store[nodeIdx1])
  123. }
  124. //for _, node := range store { node.initPorts() }
  125. return store
  126. }
  127. ////////////////////////////////////////////////////////////////////////////////
  128. func startNetwork(store map[[32]byte]*Node) {
  129. for _, node := range store {
  130. node.startPeers()
  131. }
  132. }
  133. func getKeyedStore(store map[int]*Node) map[[32]byte]*Node {
  134. newStore := make(map[[32]byte]*Node)
  135. for _, node := range store {
  136. newStore[node.core.DEBUG_getSigningPublicKey()] = node
  137. }
  138. return newStore
  139. }
  140. func testPaths(store map[[32]byte]*Node) bool {
  141. nNodes := len(store)
  142. count := 0
  143. for _, source := range store {
  144. count++
  145. fmt.Printf("Testing paths from node %d / %d (%d)\n", count, nNodes, source.index)
  146. for _, dest := range store {
  147. //if source == dest { continue }
  148. destLoc := dest.core.DEBUG_getLocator()
  149. coords := destLoc.DEBUG_getCoords()
  150. temp := 0
  151. ttl := ^uint64(0)
  152. oldTTL := ttl
  153. for here := source; here != dest; {
  154. temp++
  155. if temp > 4096 {
  156. fmt.Println("Loop?")
  157. time.Sleep(time.Second)
  158. return false
  159. }
  160. nextPort := here.core.DEBUG_switchLookup(coords)
  161. // First check if "here" is accepting packets from the previous node
  162. // TODO explain how this works
  163. ports := here.core.DEBUG_getPeers().DEBUG_getPorts()
  164. nextPeer := ports[nextPort]
  165. if nextPeer == nil {
  166. fmt.Println("Peer associated with next port is nil")
  167. return false
  168. }
  169. next := store[nextPeer.DEBUG_getSigKey()]
  170. /*
  171. if next == here {
  172. //for idx, link := range here.links {
  173. // fmt.Println("DUMP:", idx, link.nodeID)
  174. //}
  175. if nextPort != 0 { panic("This should not be") }
  176. fmt.Println("Failed to route:", source.index, here.index, dest.index, oldTTL, ttl)
  177. //here.table.DEBUG_dumpTable()
  178. //fmt.Println("Ports:", here.nodeID, here.ports)
  179. return false
  180. panic(fmt.Sprintln("Routing Loop:",
  181. source.index,
  182. here.index,
  183. dest.index))
  184. }
  185. */
  186. if temp > 4090 {
  187. fmt.Println("DEBUG:",
  188. source.index, source.core.DEBUG_getLocator(),
  189. here.index, here.core.DEBUG_getLocator(),
  190. dest.index, dest.core.DEBUG_getLocator())
  191. //here.core.DEBUG_getSwitchTable().DEBUG_dumpTable()
  192. }
  193. if here != source {
  194. // This is sufficient to check for routing loops or blackholes
  195. //break
  196. }
  197. if here == next {
  198. fmt.Println("Drop:", source.index, here.index, dest.index, oldTTL)
  199. return false
  200. }
  201. here = next
  202. }
  203. }
  204. }
  205. return true
  206. }
  207. func stressTest(store map[[32]byte]*Node) {
  208. fmt.Println("Stress testing network...")
  209. nNodes := len(store)
  210. dests := make([][]byte, 0, nNodes)
  211. for _, dest := range store {
  212. loc := dest.core.DEBUG_getLocator()
  213. coords := loc.DEBUG_getCoords()
  214. dests = append(dests, coords)
  215. }
  216. lookups := 0
  217. start := time.Now()
  218. for _, source := range store {
  219. for _, coords := range dests {
  220. source.core.DEBUG_switchLookup(coords)
  221. lookups++
  222. }
  223. }
  224. timed := time.Since(start)
  225. fmt.Printf("%d lookups in %s (%f lookups per second)\n",
  226. lookups,
  227. timed,
  228. float64(lookups)/timed.Seconds())
  229. }
  230. func pingNodes(store map[[32]byte]*Node) {
  231. fmt.Println("Sending pings...")
  232. nNodes := len(store)
  233. count := 0
  234. equiv := func(a []byte, b []byte) bool {
  235. if len(a) != len(b) {
  236. return false
  237. }
  238. for idx := 0; idx < len(a); idx++ {
  239. if a[idx] != b[idx] {
  240. return false
  241. }
  242. }
  243. return true
  244. }
  245. for _, source := range store {
  246. count++
  247. //if count > 16 { break }
  248. fmt.Printf("Sending packets from node %d/%d (%d)\n", count, nNodes, source.index)
  249. sourceKey := source.core.DEBUG_getEncryptionPublicKey()
  250. payload := sourceKey[:]
  251. sourceAddr := source.core.DEBUG_getAddr()[:]
  252. sendTo := func(bs []byte, destAddr []byte) {
  253. packet := make([]byte, 40+len(bs))
  254. copy(packet[8:24], sourceAddr)
  255. copy(packet[24:40], destAddr)
  256. copy(packet[40:], bs)
  257. packet[0] = 6 << 4
  258. source.send <- packet
  259. }
  260. destCount := 0
  261. for _, dest := range store {
  262. destCount += 1
  263. fmt.Printf("%d Nodes, %d Send, %d Recv\n", nNodes, count, destCount)
  264. if dest == source {
  265. fmt.Println("Skipping self")
  266. continue
  267. }
  268. destAddr := dest.core.DEBUG_getAddr()[:]
  269. ticker := time.NewTicker(150 * time.Millisecond)
  270. sendTo(payload, destAddr)
  271. for loop := true; loop; {
  272. select {
  273. case packet := <-dest.recv:
  274. {
  275. if equiv(payload, packet[len(packet)-len(payload):]) {
  276. loop = false
  277. }
  278. }
  279. case <-ticker.C:
  280. sendTo(payload, destAddr)
  281. //dumpDHTSize(store) // note that this uses racey functions to read things...
  282. }
  283. }
  284. ticker.Stop()
  285. }
  286. //break // Only try sending pings from 1 node
  287. // This is because, for some reason, stopTun() doesn't always close it
  288. // And if two tuns are up, bad things happen (sends via wrong interface)
  289. }
  290. fmt.Println("Finished pinging nodes")
  291. }
  292. func pingBench(store map[[32]byte]*Node) {
  293. fmt.Println("Benchmarking pings...")
  294. nPings := 0
  295. payload := make([]byte, 1280+40) // MTU + ipv6 header
  296. var timed time.Duration
  297. //nNodes := len(store)
  298. count := 0
  299. for _, source := range store {
  300. count++
  301. //fmt.Printf("Sending packets from node %d/%d (%d)\n", count, nNodes, source.index)
  302. getPing := func(key [32]byte, decodedCoords []byte) []byte {
  303. // TODO write some function to do this the right way, put... somewhere...
  304. coords := DEBUG_wire_encode_coords(decodedCoords)
  305. packet := make([]byte, 0, len(key)+len(coords)+len(payload))
  306. packet = append(packet, key[:]...)
  307. packet = append(packet, coords...)
  308. packet = append(packet, payload[:]...)
  309. return packet
  310. }
  311. for _, dest := range store {
  312. key := dest.core.DEBUG_getEncryptionPublicKey()
  313. loc := dest.core.DEBUG_getLocator()
  314. coords := loc.DEBUG_getCoords()
  315. ping := getPing(key, coords)
  316. // TODO make sure the session is open first
  317. start := time.Now()
  318. for i := 0; i < 1000000; i++ {
  319. source.send <- ping
  320. nPings++
  321. }
  322. timed += time.Since(start)
  323. break
  324. }
  325. break
  326. }
  327. fmt.Printf("Sent %d pings in %s (%f per second)\n",
  328. nPings,
  329. timed,
  330. float64(nPings)/timed.Seconds())
  331. }
  332. func dumpStore(store map[NodeID]*Node) {
  333. for _, node := range store {
  334. fmt.Println("DUMPSTORE:", node.index, node.core.DEBUG_getLocator())
  335. node.core.DEBUG_getSwitchTable().DEBUG_dumpTable()
  336. }
  337. }
  338. func dumpDHTSize(store map[[32]byte]*Node) {
  339. var min, max, sum int
  340. for _, node := range store {
  341. num := node.core.DEBUG_getDHTSize()
  342. min = num
  343. max = num
  344. break
  345. }
  346. for _, node := range store {
  347. num := node.core.DEBUG_getDHTSize()
  348. if num < min {
  349. min = num
  350. }
  351. if num > max {
  352. max = num
  353. }
  354. sum += num
  355. }
  356. avg := float64(sum) / float64(len(store))
  357. fmt.Printf("DHT min %d / avg %f / max %d\n", min, avg, max)
  358. }
  359. func (n *Node) startTCP(listen string) {
  360. n.core.DEBUG_setupAndStartGlobalTCPInterface(listen)
  361. }
  362. func (n *Node) connectTCP(remoteAddr string) {
  363. n.core.AddPeer(remoteAddr, remoteAddr)
  364. }
  365. ////////////////////////////////////////////////////////////////////////////////
  366. var cpuprofile = flag.String("cpuprofile", "", "write cpu profile `file`")
  367. var memprofile = flag.String("memprofile", "", "write memory profile to this file")
  368. func main() {
  369. flag.Parse()
  370. if *cpuprofile != "" {
  371. f, err := os.Create(*cpuprofile)
  372. if err != nil {
  373. panic(fmt.Sprintf("could not create CPU profile: ", err))
  374. }
  375. if err := pprof.StartCPUProfile(f); err != nil {
  376. panic(fmt.Sprintf("could not start CPU profile: ", err))
  377. }
  378. defer pprof.StopCPUProfile()
  379. }
  380. if *memprofile != "" {
  381. f, err := os.Create(*memprofile)
  382. if err != nil {
  383. panic(fmt.Sprintf("could not create memory profile: ", err))
  384. }
  385. defer func() { pprof.WriteHeapProfile(f); f.Close() }()
  386. }
  387. fmt.Println("Test")
  388. Util_testAddrIDMask()
  389. idxstore := makeStoreSquareGrid(4)
  390. //idxstore := makeStoreStar(256)
  391. //idxstore := loadGraph("misc/sim/hype-2016-09-19.list")
  392. //idxstore := loadGraph("misc/sim/fc00-2017-08-12.txt")
  393. //idxstore := loadGraph("skitter")
  394. kstore := getKeyedStore(idxstore)
  395. //*
  396. logger := log.New(os.Stderr, "", log.Flags())
  397. for _, n := range kstore {
  398. n.core.DEBUG_setLogger(logger)
  399. }
  400. //*/
  401. startNetwork(kstore)
  402. //time.Sleep(10*time.Second)
  403. // Note that testPaths only works if pressure is turned off
  404. // Otherwise congestion can lead to routing loops?
  405. for finished := false; !finished; {
  406. finished = testPaths(kstore)
  407. }
  408. pingNodes(kstore)
  409. //pingBench(kstore) // Only after disabling debug output
  410. //stressTest(kstore)
  411. //time.Sleep(120 * time.Second)
  412. dumpDHTSize(kstore) // note that this uses racey functions to read things...
  413. if false {
  414. // This connects the sim to the local network
  415. for _, node := range kstore {
  416. node.startTCP("localhost:0")
  417. node.connectTCP("localhost:12345")
  418. break // just 1
  419. }
  420. for _, node := range kstore {
  421. go func() {
  422. // Just dump any packets sent to this node
  423. for range node.recv {
  424. }
  425. }()
  426. }
  427. var block chan struct{}
  428. <-block
  429. }
  430. runtime.GC()
  431. }