[搜片神器]之DHT办事器的代码实现办法   
               添加时间:2013-7-26 点击量: 
 
              
持续接着第一篇写:应用C#实现DHT磁力搜刮的BT种子后端经管法度+数据库设计(开源)[搜片神器]
开源地址:https://github.com/h31h31/H31DHTMgr
法度:H31DHT
 
看大师对昨天此类文章的爱好没有第一篇高,今天就简单的对支撑的伴侣进行交换.园子里的伴侣授大师以渔,所以这项目组代码就先不放出来.大师更多的参加进来.
也谁有才能将C++的代码转换成C#的,添加到我们的搜片神器对象里面.
昨天经由过程向大师介绍DHT的工作道理,信赖大师可能熟悉打听怎么回事,不熟悉打听的伴侣可以持续分享接下来的文章.
 本人鉴戒的代码是C++版本的:transmission里面的DHT代码,大师可以接见网站:http://www.transmissionbt.com/ 
不过里面的代码景象是LINUX下的,须要本身转换到响应的WIN平台上来.
有爱好应用C#来完成DHT功能的伴侣可以鉴戒mono-monotorrent,里面的框架代码斗劲多,不如C++的transmission里面就三个文件来得熟悉打听.

transmission里面只有三个文件就可以实现dht的功能: dht.c dht.h dht-example.c,并且接口很简单,复用性很好。
下面介绍进入DHT收集首要功能步调
dht.c dht.h代码分成三项目组: 
1、路由表的插入操纵。
1)若是节点已经在路由表中,则更新节点,返回。
2)若是桶没有满,则插入,返回。
3)若是发明失效节点,调换,返回。
4)发明可疑节点,则保存新节点到缓存中并且若是该可疑节点没有ping,发出ping_node操纵,返回。
5)如今,桶已经充斥了好的节点,若是本身的ID没有落在这个桶中,返回。
6)将桶空间分成两半。跳到步调1)。
2、KAD长途处理惩罚调用。
这项目组又分成3种,
1)ping/pong操纵。
所有的包的tid都应用pg\0\0
2)find_node操纵。
所有的包的tid都应用fn\0\0
3)get_peers/annouce_peer操纵。
对同一个HASH的一次递归查询中,tid对峙不变。
此中只有3)种实现bittorrent的DHT规范里面提到的递归查询操纵,1)和2)仅仅用来保护路由表,并且不保存状况。
3、按时器处理惩罚:
为了检测路由表中节点的有效性(按照规范,路由表中应当只保存有效节点),在代码中,在履行krpc操纵时若是发明时对路由表中的节点操纵,那么则保存操纵的开端时候 
pinged_time,经由过程操纵的开端时候来断定操纵是否超时。
expire_stuff_time 
超不时,会履行下面的操纵:
1、搜检路由表中失效的节点(按照pinged_time来剖断),并将该节点删除。
2、搜检用来保存annoounce_peer的节点是否跨越30分钟(这个不筹算深切评论辩论,故不做解析)。
3、搜检递归查询操纵超时。
rotate_secrets_time 
按时器。
用来每隔大约15分阁下就调换token(见DHT规范).
confirm_nodes_time 
按时器。
查找长久没有活动的桶,然后经由过程履行一个find_node的krpc操纵来刷新它。
search_time按时器。
有可能呈现发出的所有的get_peers操纵,都没有应答,那么search_time按时器碰到这种景象时负责重发所有恳求。(重视: 
get_peers操纵最大未决的krpc恳求数是3)
用于保持路由表的ping/pong操纵:
在试图插入节点时,发明桶已经满,而存在可疑节点时会触发ping_node操纵。未响应的节点会有可疑终极变为失效节点,而被调换。
下面介绍我们是如何进入DHT收集
- DHT必须把本身电脑当办事器,别人才干够知道本身在谁,所以须要经由过程UDP绑定端口,参考代码里面支撑IPV6,小我感觉可以过滤掉.WIN平台代码如下:
 
- 

 1     //初始化socket
 2     m_soListen =(int)socket(PF_INET, SOCK_DGRAM, IPPROTO_IP);
 3     if (m_soListen == INVALID_SOCKET) {
 4         m_iErrorNo=WSAGetLastError();
 5         _dout(_T(CH31CarMonitorDlg Start Error(%d).\n),m_iErrorNo);
 6         return -1;
 7     }
 8     //初始化办事器地址
 9     SOCKADDR_IN addr;
10     memset(&addr, 0, sizeof(addr));
11     addr.sin_family = AF_INET;
12     addr.sin_port = htons(port);
13     addr.sin_addr.s_addr = htonl(INADDR_ANY);
14     //绑定端口
15     if (bind(m_soListen, (SOCKADDR)&addr, sizeof(addr)) == SOCKET_ERROR) {
16         m_iErrorNo=WSAGetLastError();
17         _dout(_T(CH31CarMonitorDlg Start Error(%d).\n),m_iErrorNo);
18         return -2;
19     }
UDP端口绑定
 
- DHT须要生成一个本身的20位ID号,当然可以经由过程随机一个数值,然后经由过程SHA1来生成20位的ID号,WIN平台代码如下:
 
- 

1 unsigned char p[20];
2 CSHA1 sha1;
3 sha1.Reset();
4 sha1.Update((const unsigned char )m_myID.GetBuffer(),   m_myID.GetLength());
5 sha1.Final();
6 sha1.GetHash(p);
SHA1生成ID号
 
- 初始化他人办事器的IP信息,如许我们就可以从他们那边查询我们要的信息,鉴戒代码如下:
 
- 

 1     rc = getaddrinfo(router.utorrent.com,6881, &hints1, &info);
 2     //rc = getaddrinfo(router.bittorrent.com,6881, &hints1, &info);
 3     //rc = getaddrinfo(dht.transmissionbt.com,6881, &hints1, &info);
 4     if(rc != 0) {
 5         fprintf(stderr, getaddrinfo: %s\n, gai_strerror(rc));
 6         exit(1);
 7     }
 8     infop = info;
 9     while(infop&&m_bDataThread) 
10     {
11         memcpy(&bootstrap_nodes[num_bootstrap_nodes],infop->ai_addr, infop->ai_addrlen);
12         infop = infop->ai_next;
13         num_bootstrap_nodes++;
14     }
15     freeaddrinfo(info);
办事器信息
 
- 如今就可以初始化我们的DHT类了.因为此类应用C写的,大师可以自行封装成C++类应用.
 
- 

1     rc = m_dht.dht_init(s, s6, m_myid,NULL);
2     if(rc < 0) {
3         perror(dht_init);
4         exit(1);
5     }
初始化DHT类
 
- 对办事器进行PING操纵,办事器就会回应PONG操纵,如许就注解办事器活动正常.
 
- 

1     for(int i = 0; i < num_bootstrap_nodes&&m_bDataThread; i++) 
2     {
3         m_dht.dht_ping_node((struct sockaddr)&bootstrap_nodes[i],sizeof(bootstrap_nodes[i]));
4         Sleep(m_dht.random() % 1000);
5     }
PING办事器
 
- 下面就可以应用搜刮类进行操纵,查询我们要的HASH值的BT种子文件代码.鉴戒代码如下:

1  if(searching) {
2             if(s >= 0)
3                 dht_search(hash, 0, AF_INET, callback, NULL);
4             if(s6 >= 0)
5                 dht_search(hash, 0, AF_INET6, callback, NULL);
6             searching = 0;
7         }
dht_search
 
- 大师可以鉴戒dht-example.c里面接下来的Search函数的操纵,不过我们不是如许来的,我们须要直接向办事器发送Findnode和Get_Peer操纵.

1                 unsigned char tid[16];
2                 m_dht.make_tid(tid, fn, 0);
3                 m_dht.send_find_node(&ipRecvPingList[ipListPOS].addr,sizeof(sockaddr),tid,4,ipRecvPingList[ipListPOS].ID,0,0);
4                 Sleep(100);
5                 memset(tid,0,sizeof(tid));
6                 m_dht.make_tid(tid, gp, 0);
7                 m_dht.send_get_peers(&ipRecvPingList[ipListPOS].addr,sizeof(sockaddr),tid,4,hashList[0],0,0);
发送FINDNODE和GET_PEER操纵
 
- 接下来的工作就是守候别人返回的信息进行解析就可以了,当然DHT类代码已经全部为我们做好的.

 1         FD_ZERO(&readfds);
 2         if(m_soListen >= 0)
 3             FD_SET(m_soListen, &readfds);
 4         if(s6 >= 0)
 5             FD_SET(s6, &readfds);
 6         rc = (m_soListen > s6 ? m_soListen + 1 : s6 + 1, &readfds, NULL, NULL, &tv);
 7         if(rc <0&&m_bDataThread) 
 8         {
 9             if(errno != EINTR) {
10                 perror();
11                 Sleep(1000);
12             }
13         }
14         
15         if(!m_bDataThread)
16             break;
17 
18         if(rc > 0&&m_bDataThread) 
19         {
20             len = sizeof(1);
21             memset(buf,0,sizeof(buf));
22             if(m_soListen >= 0 && FD_ISSET(m_soListen, &readfds))
23                 rc = recv(m_soListen, buf, sizeof(buf) - 1, 0,&1, &len);
24             else if(s6 >= 0 && FD_ISSET(s6, &readfds))
25                 rc = recv(s6, buf, sizeof(buf) - 1, 0,&1, &len);
26             else
27                 abort();
28         }
29 
30         if(rc > 0&&m_bDataThread) 
31         {
32             buf[rc] = \0;
33             rc = m_dht.dht_periodic(buf, rc, &1, len,&tosleep, DHT_callback, this);
34 
35         } 
36         else 
37         {
38             rc = m_dht.dht_periodic(NULL, 0, NULL, 0, &tosleep, DHT_callback, this);
39         }
守候返回DHT收集信息
 
- 如何解析信息DHT代码已经有了,如何别人的恳求,代码也已经有了,大师可以解析DHT.c就知道是怎么回事.

  1 int CDHT::dht_periodic(const void buf, size_t buflen,const struct sockaddr Addr, int len,time_t tosleep,dht_callback callback, void closure)
  2 {
  3     gettimeofday(&nowTime, NULL);
  4 
  5     if(buflen > 0) 
  6     {
  7         int message;
  8         unsigned char tid[16], id[20], info_hash[20], target[20];
  9         unsigned char nodes[256], nodes6[1024], token[128];
 10         int tid_len = 16, token_len = 128;
 11         int nodes_len = 256, nodes6_len = 1024;
 12         unsigned short port;
 13         unsigned char values[2048], values6[2048];
 14         int values_len = 2048, values6_len = 2048;
 15         int want;
 16         unsigned short ttid;
 17 
 18         struct sockaddr_in tempip=(struct sockaddr_in )Addr;
 19 
 20         if(is_martian(Addr))
 21             goto dontread;
 22 
 23         if(node_blacklisted(Addr, len)) {
 24             _dout(Received packet  blacklisted node.\n);
 25             goto dontread;
 26         }
 27 
 28         if(((char)buf)[buflen] != \0) {
 29             _dout(Unterminated message.\n);
 30             errno = EINVAL;
 31             return -1;
 32         }
 33 
 34         message = parse_message((unsigned char )buf, buflen, tid, &tid_len, id, info_hash,target, &port, token, &token_len,nodes, &nodes_len, nodes6, &nodes6_len,values, &values_len, values6, &values6_len,&want);
 35 
 36         if(token_len>0)
 37         {
 38             int a=0;
 39         }
 40         if(message < 0 || message == ERROR || id_cmp(id, zeroes) == 0) 
 41         {
 42             _dout(Unparseable message: );
 43             debug_printable((const unsigned char )buf, buflen);
 44             _dout(\n);
 45             goto dontread;
 46         }
 47 
 48         if(id_cmp(id, myid) == 0) {
 49             _dout(Received message  self.\n);
 50             goto dontread;
 51         }
 52 
 53         if(message > REPLY) {
 54             / Rate limit requests. /
 55             if(!token_bucket()) {
 56                 _dout(Dropping request due to rate limiting.\n);
 57                 goto dontread;
 58             }
 59         }
 60 
 61         switch(message) 
 62         {
 63         case REPLY:
 64             if(tid_len != 4) 
 65             {
 66                 _dout(Broken node s transaction ids: );
 67                 debug_printable((const unsigned char )buf, buflen);
 68                 _dout(\n);
 69                 / This is really annoying, as it means that we will
 70                    time-out all our searches that go through this node.
 71                    Kill it. /
 72                 blacklist_node(id, Addr, len);
 73                 goto dontread;
 74             }
 75             if(tid_match(tid, pn, NULL)) 
 76             {
 77                 _dout(Pong!From IP:%s:[%d] id:[%s]\n,inet_ntoa(tempip->sin_addr),tempip->sin_port,id);
 78                 new_node(id, Addr, len, 2);
 79                 (callback)(closure, DHT_EVENT_PONG_VALUES,id,(void)Addr, len);
 80                 //send_find_node(,len,tid,4,id,0,0);
 81             } 
 82             else if(tid_match(tid, fn, NULL) ||tid_match(tid, gp, NULL)) 
 83             {
 84                 int gp = 0;
 85                 struct search sr = NULL;
 86                 if(tid_match(tid, gp, &ttid)) 
 87                 {
 88                     gp = 1;
 89                     sr = find_search(ttid, Addr->sa_family);
 90                 }
 91                 _dout(Nodes found (%d+%d)%s!From IP:%s:[%d]\n, nodes_len/26, nodes6_len/38,gp ?  for get_peers : ,inet_ntoa(tempip->sin_addr),tempip->sin_port);
 92                 if(nodes_len % 26 != 0 || nodes6_len % 38 != 0) 
 93                 {
 94                     _dout(Unexpected length for node info!\n);
 95                     blacklist_node(id, Addr, len);
 96                 } 
 97                 //else if(gp && sr == NULL) 
 98                 //{
 99     //                _dout(Unknown search!\n);
100     //                new_node(id, Addr, len, 1);
101     //            } 
102                 else 
103                 {
104                     int i;
105                     new_node(id, Addr, len, 2);
106                     for(i = 0; i < nodes_len / 26; i++) 
107                     {
108                         unsigned char ni = nodes + i  26;
109                         struct sockaddr_in sin;
110                         if(id_cmp(ni, myid) == 0)
111                             continue;
112                         memset(&sin, 0, sizeof(sin));
113                         sin.sin_family = AF_INET;
114                         memcpy(&sin.sin_addr, ni + 20, 4);
115                         memcpy(&sin.sin_port, ni + 24, 2);
116                         new_node(ni, (struct sockaddr)&sin, sizeof(sin), 0);
117                         (callback)(closure, DHT_EVENT_FINDNODE_VALUES, ni,(void)&sin, sizeof(sin));
118                         if(sr && sr->af == AF_INET) 
119                         {
120                             _search_node(ni,(struct sockaddr)&sin,sizeof(sin),sr, 0, NULL, 0);
121                         }
122                         //send_get_peers((struct sockaddr)&sin,sizeof(sockaddr),tid,4,ni,0,0);
123                     }
124                     for(i = 0; i < nodes6_len / 38; i++) 
125                     {
126                         unsigned char ni = nodes6 + i  38;
127                         struct sockaddr_in6 sinip6;
128                         if(id_cmp(ni, myid) == 0)
129                             continue;
130                         memset(&sinip6, 0, sizeof(sinip6));
131                         sinip6.sin6_family = AF_INET6;
132                         memcpy(&sinip6.sin6_addr, ni + 20, 16);
133                         memcpy(&sinip6.sin6_port, ni + 36, 2);
134                         new_node(ni, (struct sockaddr)&sinip6, sizeof(sinip6), 0);
135                         if(sr && sr->af == AF_INET6) 
136                         {
137                             _search_node(ni,(struct sockaddr)&sinip6,sizeof(sinip6),sr, 0, NULL, 0);
138                         }
139                     }
140                     if(sr)
141                         / Since we received a reply, the number of requests in flight has decreased.  Lets push another request. /
142                         search_send_get_peers(sr, NULL);
143                 }
144                 //if(sr) 
145                 {
146                    // _search_node(id, Addr, len, sr,1, token, token_len);
147                     if(values_len > 0 || values6_len > 0) 
148                     {
149                         _dout(Got values (%d+%d)!\n, values_len / 6, values6_len / 18);
150                         if(callback) {
151                             if(values_len > 0)
152                                 (callback)(closure, DHT_EVENT_VALUES, sr->id,(void)values, values_len);
153 
154                             if(values6_len > 0)
155                                 (callback)(closure, DHT_EVENT_VALUES6, sr->id,(void)values6, values6_len);
156                         }
157                     }
158                 }
159             } 
160             else if(tid_match(tid, ap, &ttid)) 
161             {
162                 struct search sr;
163                 _dout(Got reply to announce_peer.\n);
164                 sr = find_search(ttid, Addr->sa_family);
165                 if(!sr) {
166                     _dout(Unknown search!\n);
167                     new_node(id, Addr, len, 1);
168                 } 
169                 else 
170                 {
171                     int i;
172                     new_node(id, Addr, len, 2);
173                     for(i = 0; i < sr->numnodes; i++)
174                     {
175                         if(id_cmp(sr->nodes[i].id, id) == 0) 
176                         {
177                             sr->nodes[i].request_time = 0;
178                             sr->nodes[i].reply_time = nowTime.tv_sec;
179                             sr->nodes[i].acked = 1;
180                             sr->nodes[i].pinged = 0;
181                             break;
182                         }
183                     }
184                     / See comment for gp above. /
185                     search_send_get_peers(sr, NULL);
186                 }
187             } 
188             else 
189             {
190                 _dout(Unexpected reply: );
191                 debug_printable((const unsigned char )buf, buflen);
192                 _dout(\n);
193             }
194             break;
195         case PING:
196             _dout(Ping (%d)!From IP:%s:%d\n, tid_len,inet_ntoa(tempip->sin_addr),tempip->sin_port);
197             new_node(id, Addr, len, 1);
198             _dout(Sending pong.\n);
199             send_pong(Addr, len, tid, tid_len);
200             break;
201         case FIND_NODE:
202             _dout(Find node!From IP:%s:%d\n,inet_ntoa(tempip->sin_addr),tempip->sin_port);
203             new_node(id, Addr, len, 1);
204             _dout(Sending closest nodes (%d).\n, want);
205             send_closest_nodes(Addr, len,tid, tid_len, target, want,0, NULL, NULL, 0);
206             break;
207         case GET_PEERS:
208             _dout(Get_peers!From IP:%s:%d\n,inet_ntoa(tempip->sin_addr),tempip->sin_port);
209             new_node(id, Addr, len, 1);
210             if(id_cmp(info_hash, zeroes) == 0) 
211             {
212                 _dout(Eek!  Got get_peers with no info_hash.\n);
213                 send_error(Addr, len, tid, tid_len,203, Get_peers with no info_hash);
214                 break;
215             } 
216             else 
217             {
218                 struct storage st = find_storage(info_hash);
219                 unsigned char token[TOKEN_SIZE];
220                 make_token(Addr, 0, token);
221                 if(st && st->numpeers > 0) 
222                 {
223                      _dout(Sending found%s peers.\n,Addr->sa_family == AF_INET6 ?  IPv6 : );
224                      send_closest_nodes(Addr, len,tid, tid_len,info_hash, want,Addr->sa_family, st,token, TOKEN_SIZE);
225                 } 
226                 else 
227                 {
228                     _dout(Sending nodes for get_peers.\n);
229                     send_closest_nodes(Addr, len,tid, tid_len, info_hash, want,0, NULL, token, TOKEN_SIZE);
230                 }
231                 if(callback) 
232                 {
233                     (callback)(closure, DHT_EVENT_GET_PEER_VALUES, info_hash,(void )Addr, len);
234                 }
235             }
236 
237             break;
238         case ANNOUNCE_PEER:
239             _dout(Announce peer!From IP:%s:%d\n,inet_ntoa(tempip->sin_addr),tempip->sin_port);
240             new_node(id, Addr, len, 1);
241 
242             if(id_cmp(info_hash, zeroes) == 0) 
243             {
244                 _dout(Announce_peer with no info_hash.\n);
245                 send_error(Addr, len, tid, tid_len,203, Announce_peer with no info_hash);
246                 break;
247             }
248             if(!token_match(token, token_len, Addr)) {
249                 _dout(Incorrect token for announce_peer.\n);
250                 send_error(Addr, len, tid, tid_len,203, Announce_peer with wrong token);
251                 break;
252             }
253             if(port == 0) {
254                 _dout(Announce_peer with forbidden port %d.\n, port);
255                 send_error(Addr, len, tid, tid_len,203, Announce_peer with forbidden port number);
256                 break;
257             }
258             if(callback) 
259             {
260                 (callback)(closure, DHT_EVENT_ANNOUNCE_PEER_VALUES, info_hash,(void )Addr, len);
261             }
262             storage_store(info_hash, Addr, port);
263             / Note that if storage_store failed, we lie to the requestor.
264                This is to prevent them  backtracking, and hence polluting the DHT. /
265             _dout(Sending peer announced.\n);
266             send_peer_announced(Addr, len, tid, tid_len);
267         }
268     }
269 
270  dontread:
271     if(nowTime.tv_sec >= rotate_secrets_time)
272         rotate_secrets();
273 
274     if(nowTime.tv_sec >= expire_stuff_time) {
275         expire_buckets(buckets);
276         expire_buckets(buckets6);
277         expire_storage();
278         expire_searches();
279     }
280 
281     if(search_time > 0 && nowTime.tv_sec >= search_time) {
282         struct search sr;
283         sr = searches;
284         while(sr) {
285             if(!sr->done && sr->step_time + 5 <= nowTime.tv_sec) 
286             {
287                 search_step(sr, callback, closure);
288             }
289             sr = sr->next;
290         }
291 
292         search_time = 0;
293 
294         sr = searches;
295         while(sr) {
296             if(!sr->done) {
297                 time_t tm = sr->step_time + 15 + random() % 10;
298                 if(search_time == 0 || search_time > tm)
299                     search_time = tm;
300             }
301             sr = sr->next;
302         }
303     }
304 
305     if(nowTime.tv_sec >= confirm_nodes_time) {
306         int soon = 0;
307 
308         soon |= bucket_maintenance(AF_INET);
309         soon |= bucket_maintenance(AF_INET6);
310 
311         if(!soon) 
312         {
313             if(mybucket_grow_time >= nowTime.tv_sec - 150)
314                 soon |= neighbourhood_maintenance(AF_INET);
315             if(mybucket6_grow_time >= nowTime.tv_sec - 150)
316                 soon |= neighbourhood_maintenance(AF_INET6);
317         }
318 
319         / In order to maintain all buckets age within 600 seconds, worst
320            case is roughly 27 seconds, assuming the table is 22 bits deep.
321            We want to keep a margin for neighborhood maintenance, so keep
322            this within 25 seconds. /
323         if(soon)
324             confirm_nodes_time = nowTime.tv_sec + 5 + random() % 20;
325         else
326             confirm_nodes_time = nowTime.tv_sec + 60 + random() % 120;
327     }
328 
329     if(confirm_nodes_time > nowTime.tv_sec)
330         tosleep = confirm_nodes_time - nowTime.tv_sec;
331     else
332         tosleep = 0;
333 
334     if(search_time > 0) {
335         if(search_time <= nowTime.tv_sec)
336             tosleep = 0;
337         else if(tosleep > search_time - nowTime.tv_sec)
338             tosleep = search_time - nowTime.tv_sec;
339     }
340 
341     return 1;
342 }
dht_periodic
 
- 至于节点如何进行桶操纵,调试过一次代码就会熟悉打听对应的道理,当然上方也介绍了如何进行桶割据的道理.
 
- 接下来就是将上方的操纵步调进行轮回.
 
经由过程上方的流程,懂得DHT的工作办法后,如何增长更多的返回信息就须要下一篇的技巧性题目的介绍,大师一路批改我们的开源法度.
大师有不熟悉打听的处所,可以一路评论辩论.
别的求办事器进行法度测试,须要有固定IP,10G的WIN办事器空间,h31h31@163.com,感谢.
大师的推荐才是下一篇介绍的动力...
容易发怒的意思就是: 别人做了蠢事, 然后我们代替他们, 表现出笨蛋的样子。—— 蔡康永
                     
                  
     
  
 
    
    
持续接着第一篇写:应用C#实现DHT磁力搜刮的BT种子后端经管法度+数据库设计(开源)[搜片神器]
开源地址:https://github.com/h31h31/H31DHTMgr
法度:H31DHT
 
看大师对昨天此类文章的爱好没有第一篇高,今天就简单的对支撑的伴侣进行交换.园子里的伴侣授大师以渔,所以这项目组代码就先不放出来.大师更多的参加进来.
也谁有才能将C++的代码转换成C#的,添加到我们的搜片神器对象里面.
昨天经由过程向大师介绍DHT的工作道理,信赖大师可能熟悉打听怎么回事,不熟悉打听的伴侣可以持续分享接下来的文章.
本人鉴戒的代码是C++版本的:transmission里面的DHT代码,大师可以接见网站:http://www.transmissionbt.com/
不过里面的代码景象是LINUX下的,须要本身转换到响应的WIN平台上来.
有爱好应用C#来完成DHT功能的伴侣可以鉴戒mono-monotorrent,里面的框架代码斗劲多,不如C++的transmission里面就三个文件来得熟悉打听.

transmission里面只有三个文件就可以实现dht的功能: dht.c dht.h dht-example.c,并且接口很简单,复用性很好。
下面介绍进入DHT收集首要功能步调
dht.c dht.h代码分成三项目组: 
1、路由表的插入操纵。
1)若是节点已经在路由表中,则更新节点,返回。
2)若是桶没有满,则插入,返回。
3)若是发明失效节点,调换,返回。
4)发明可疑节点,则保存新节点到缓存中并且若是该可疑节点没有ping,发出ping_node操纵,返回。
5)如今,桶已经充斥了好的节点,若是本身的ID没有落在这个桶中,返回。
6)将桶空间分成两半。跳到步调1)。
2、KAD长途处理惩罚调用。
这项目组又分成3种,
1)ping/pong操纵。
所有的包的tid都应用pg\0\0
2)find_node操纵。
所有的包的tid都应用fn\0\0
3)get_peers/annouce_peer操纵。
对同一个HASH的一次递归查询中,tid对峙不变。
此中只有3)种实现bittorrent的DHT规范里面提到的递归查询操纵,1)和2)仅仅用来保护路由表,并且不保存状况。
3、按时器处理惩罚:
为了检测路由表中节点的有效性(按照规范,路由表中应当只保存有效节点),在代码中,在履行krpc操纵时若是发明时对路由表中的节点操纵,那么则保存操纵的开端时候 
pinged_time,经由过程操纵的开端时候来断定操纵是否超时。
expire_stuff_time 
超不时,会履行下面的操纵:
1、搜检路由表中失效的节点(按照pinged_time来剖断),并将该节点删除。
2、搜检用来保存annoounce_peer的节点是否跨越30分钟(这个不筹算深切评论辩论,故不做解析)。
3、搜检递归查询操纵超时。
rotate_secrets_time 
按时器。
用来每隔大约15分阁下就调换token(见DHT规范).
confirm_nodes_time 
按时器。
查找长久没有活动的桶,然后经由过程履行一个find_node的krpc操纵来刷新它。
search_time按时器。
有可能呈现发出的所有的get_peers操纵,都没有应答,那么search_time按时器碰到这种景象时负责重发所有恳求。(重视: 
get_peers操纵最大未决的krpc恳求数是3)
用于保持路由表的ping/pong操纵:
在试图插入节点时,发明桶已经满,而存在可疑节点时会触发ping_node操纵。未响应的节点会有可疑终极变为失效节点,而被调换。
下面介绍我们是如何进入DHT收集
- DHT必须把本身电脑当办事器,别人才干够知道本身在谁,所以须要经由过程UDP绑定端口,参考代码里面支撑IPV6,小我感觉可以过滤掉.WIN平台代码如下:
 - 

1 //初始化socket
2 m_soListen =(int)socket(PF_INET, SOCK_DGRAM, IPPROTO_IP);
3 if (m_soListen == INVALID_SOCKET) {
4 m_iErrorNo=WSAGetLastError();
5 _dout(_T(CH31CarMonitorDlg Start Error(%d).\n),m_iErrorNo);
6 return -1;
7 }
8 //初始化办事器地址
9 SOCKADDR_IN addr;
10 memset(&addr, 0, sizeof(addr));
11 addr.sin_family = AF_INET;
12 addr.sin_port = htons(port);
13 addr.sin_addr.s_addr = htonl(INADDR_ANY);
14 //绑定端口
15 if (bind(m_soListen, (SOCKADDR)&addr, sizeof(addr)) == SOCKET_ERROR) {
16 m_iErrorNo=WSAGetLastError();
17 _dout(_T(CH31CarMonitorDlg Start Error(%d).\n),m_iErrorNo);
18 return -2;
19 }
UDP端口绑定 - DHT须要生成一个本身的20位ID号,当然可以经由过程随机一个数值,然后经由过程SHA1来生成20位的ID号,WIN平台代码如下:
 - 

1 unsigned char p[20];
2 CSHA1 sha1;
3 sha1.Reset();
4 sha1.Update((const unsigned char )m_myID.GetBuffer(), m_myID.GetLength());
5 sha1.Final();
6 sha1.GetHash(p);
SHA1生成ID号 - 初始化他人办事器的IP信息,如许我们就可以从他们那边查询我们要的信息,鉴戒代码如下:
 - 

1 rc = getaddrinfo(router.utorrent.com,6881, &hints1, &info);
2 //rc = getaddrinfo(router.bittorrent.com,6881, &hints1, &info);
3 //rc = getaddrinfo(dht.transmissionbt.com,6881, &hints1, &info);
4 if(rc != 0) {
5 fprintf(stderr, getaddrinfo: %s\n, gai_strerror(rc));
6 exit(1);
7 }
8 infop = info;
9 while(infop&&m_bDataThread)
10 {
11 memcpy(&bootstrap_nodes[num_bootstrap_nodes],infop->ai_addr, infop->ai_addrlen);
12 infop = infop->ai_next;
13 num_bootstrap_nodes++;
14 }
15 freeaddrinfo(info);
办事器信息 - 如今就可以初始化我们的DHT类了.因为此类应用C写的,大师可以自行封装成C++类应用.
 - 

1 rc = m_dht.dht_init(s, s6, m_myid,NULL);
2 if(rc < 0) {
3 perror(dht_init);
4 exit(1);
5 }
初始化DHT类 - 对办事器进行PING操纵,办事器就会回应PONG操纵,如许就注解办事器活动正常.
 - 

1 for(int i = 0; i < num_bootstrap_nodes&&m_bDataThread; i++)
2 {
3 m_dht.dht_ping_node((struct sockaddr)&bootstrap_nodes[i],sizeof(bootstrap_nodes[i]));
4 Sleep(m_dht.random() % 1000);
5 }
PING办事器 - 下面就可以应用搜刮类进行操纵,查询我们要的HASH值的BT种子文件代码.鉴戒代码如下:

1 if(searching) {
2 if(s >= 0)
3 dht_search(hash, 0, AF_INET, callback, NULL);
4 if(s6 >= 0)
5 dht_search(hash, 0, AF_INET6, callback, NULL);
6 searching = 0;
7 }
dht_search - 大师可以鉴戒dht-example.c里面接下来的Search函数的操纵,不过我们不是如许来的,我们须要直接向办事器发送Findnode和Get_Peer操纵.

1 unsigned char tid[16];
2 m_dht.make_tid(tid, fn, 0);
3 m_dht.send_find_node(&ipRecvPingList[ipListPOS].addr,sizeof(sockaddr),tid,4,ipRecvPingList[ipListPOS].ID,0,0);
4 Sleep(100);
5 memset(tid,0,sizeof(tid));
6 m_dht.make_tid(tid, gp, 0);
7 m_dht.send_get_peers(&ipRecvPingList[ipListPOS].addr,sizeof(sockaddr),tid,4,hashList[0],0,0);
发送FINDNODE和GET_PEER操纵 - 接下来的工作就是守候别人返回的信息进行解析就可以了,当然DHT类代码已经全部为我们做好的.

1 FD_ZERO(&readfds);
2 if(m_soListen >= 0)
3 FD_SET(m_soListen, &readfds);
4 if(s6 >= 0)
5 FD_SET(s6, &readfds);
6 rc = (m_soListen > s6 ? m_soListen + 1 : s6 + 1, &readfds, NULL, NULL, &tv);
7 if(rc <0&&m_bDataThread)
8 {
9 if(errno != EINTR) {
10 perror();
11 Sleep(1000);
12 }
13 }
14
15 if(!m_bDataThread)
16 break;
17
18 if(rc > 0&&m_bDataThread)
19 {
20 len = sizeof(1);
21 memset(buf,0,sizeof(buf));
22 if(m_soListen >= 0 && FD_ISSET(m_soListen, &readfds))
23 rc = recv(m_soListen, buf, sizeof(buf) - 1, 0,&1, &len);
24 else if(s6 >= 0 && FD_ISSET(s6, &readfds))
25 rc = recv(s6, buf, sizeof(buf) - 1, 0,&1, &len);
26 else
27 abort();
28 }
29
30 if(rc > 0&&m_bDataThread)
31 {
32 buf[rc] = \0;
33 rc = m_dht.dht_periodic(buf, rc, &1, len,&tosleep, DHT_callback, this);
34
35 }
36 else
37 {
38 rc = m_dht.dht_periodic(NULL, 0, NULL, 0, &tosleep, DHT_callback, this);
39 }
守候返回DHT收集信息 - 如何解析信息DHT代码已经有了,如何别人的恳求,代码也已经有了,大师可以解析DHT.c就知道是怎么回事.

1 int CDHT::dht_periodic(const void buf, size_t buflen,const struct sockaddr Addr, int len,time_t tosleep,dht_callback callback, void closure)
2 {
3 gettimeofday(&nowTime, NULL);
4
5 if(buflen > 0)
6 {
7 int message;
8 unsigned char tid[16], id[20], info_hash[20], target[20];
9 unsigned char nodes[256], nodes6[1024], token[128];
10 int tid_len = 16, token_len = 128;
11 int nodes_len = 256, nodes6_len = 1024;
12 unsigned short port;
13 unsigned char values[2048], values6[2048];
14 int values_len = 2048, values6_len = 2048;
15 int want;
16 unsigned short ttid;
17
18 struct sockaddr_in tempip=(struct sockaddr_in )Addr;
19
20 if(is_martian(Addr))
21 goto dontread;
22
23 if(node_blacklisted(Addr, len)) {
24 _dout(Received packet blacklisted node.\n);
25 goto dontread;
26 }
27
28 if(((char)buf)[buflen] != \0) {
29 _dout(Unterminated message.\n);
30 errno = EINVAL;
31 return -1;
32 }
33
34 message = parse_message((unsigned char )buf, buflen, tid, &tid_len, id, info_hash,target, &port, token, &token_len,nodes, &nodes_len, nodes6, &nodes6_len,values, &values_len, values6, &values6_len,&want);
35
36 if(token_len>0)
37 {
38 int a=0;
39 }
40 if(message < 0 || message == ERROR || id_cmp(id, zeroes) == 0)
41 {
42 _dout(Unparseable message: );
43 debug_printable((const unsigned char )buf, buflen);
44 _dout(\n);
45 goto dontread;
46 }
47
48 if(id_cmp(id, myid) == 0) {
49 _dout(Received message self.\n);
50 goto dontread;
51 }
52
53 if(message > REPLY) {
54 / Rate limit requests. /
55 if(!token_bucket()) {
56 _dout(Dropping request due to rate limiting.\n);
57 goto dontread;
58 }
59 }
60
61 switch(message)
62 {
63 case REPLY:
64 if(tid_len != 4)
65 {
66 _dout(Broken node s transaction ids: );
67 debug_printable((const unsigned char )buf, buflen);
68 _dout(\n);
69 / This is really annoying, as it means that we will
70 time-out all our searches that go through this node.
71 Kill it. /
72 blacklist_node(id, Addr, len);
73 goto dontread;
74 }
75 if(tid_match(tid, pn, NULL))
76 {
77 _dout(Pong!From IP:%s:[%d] id:[%s]\n,inet_ntoa(tempip->sin_addr),tempip->sin_port,id);
78 new_node(id, Addr, len, 2);
79 (callback)(closure, DHT_EVENT_PONG_VALUES,id,(void)Addr, len);
80 //send_find_node(,len,tid,4,id,0,0);
81 }
82 else if(tid_match(tid, fn, NULL) ||tid_match(tid, gp, NULL))
83 {
84 int gp = 0;
85 struct search sr = NULL;
86 if(tid_match(tid, gp, &ttid))
87 {
88 gp = 1;
89 sr = find_search(ttid, Addr->sa_family);
90 }
91 _dout(Nodes found (%d+%d)%s!From IP:%s:[%d]\n, nodes_len/26, nodes6_len/38,gp ? for get_peers : ,inet_ntoa(tempip->sin_addr),tempip->sin_port);
92 if(nodes_len % 26 != 0 || nodes6_len % 38 != 0)
93 {
94 _dout(Unexpected length for node info!\n);
95 blacklist_node(id, Addr, len);
96 }
97 //else if(gp && sr == NULL)
98 //{
99 // _dout(Unknown search!\n);
100 // new_node(id, Addr, len, 1);
101 // }
102 else
103 {
104 int i;
105 new_node(id, Addr, len, 2);
106 for(i = 0; i < nodes_len / 26; i++)
107 {
108 unsigned char ni = nodes + i 26;
109 struct sockaddr_in sin;
110 if(id_cmp(ni, myid) == 0)
111 continue;
112 memset(&sin, 0, sizeof(sin));
113 sin.sin_family = AF_INET;
114 memcpy(&sin.sin_addr, ni + 20, 4);
115 memcpy(&sin.sin_port, ni + 24, 2);
116 new_node(ni, (struct sockaddr)&sin, sizeof(sin), 0);
117 (callback)(closure, DHT_EVENT_FINDNODE_VALUES, ni,(void)&sin, sizeof(sin));
118 if(sr && sr->af == AF_INET)
119 {
120 _search_node(ni,(struct sockaddr)&sin,sizeof(sin),sr, 0, NULL, 0);
121 }
122 //send_get_peers((struct sockaddr)&sin,sizeof(sockaddr),tid,4,ni,0,0);
123 }
124 for(i = 0; i < nodes6_len / 38; i++)
125 {
126 unsigned char ni = nodes6 + i 38;
127 struct sockaddr_in6 sinip6;
128 if(id_cmp(ni, myid) == 0)
129 continue;
130 memset(&sinip6, 0, sizeof(sinip6));
131 sinip6.sin6_family = AF_INET6;
132 memcpy(&sinip6.sin6_addr, ni + 20, 16);
133 memcpy(&sinip6.sin6_port, ni + 36, 2);
134 new_node(ni, (struct sockaddr)&sinip6, sizeof(sinip6), 0);
135 if(sr && sr->af == AF_INET6)
136 {
137 _search_node(ni,(struct sockaddr)&sinip6,sizeof(sinip6),sr, 0, NULL, 0);
138 }
139 }
140 if(sr)
141 / Since we received a reply, the number of requests in flight has decreased. Lets push another request. /
142 search_send_get_peers(sr, NULL);
143 }
144 //if(sr)
145 {
146 // _search_node(id, Addr, len, sr,1, token, token_len);
147 if(values_len > 0 || values6_len > 0)
148 {
149 _dout(Got values (%d+%d)!\n, values_len / 6, values6_len / 18);
150 if(callback) {
151 if(values_len > 0)
152 (callback)(closure, DHT_EVENT_VALUES, sr->id,(void)values, values_len);
153
154 if(values6_len > 0)
155 (callback)(closure, DHT_EVENT_VALUES6, sr->id,(void)values6, values6_len);
156 }
157 }
158 }
159 }
160 else if(tid_match(tid, ap, &ttid))
161 {
162 struct search sr;
163 _dout(Got reply to announce_peer.\n);
164 sr = find_search(ttid, Addr->sa_family);
165 if(!sr) {
166 _dout(Unknown search!\n);
167 new_node(id, Addr, len, 1);
168 }
169 else
170 {
171 int i;
172 new_node(id, Addr, len, 2);
173 for(i = 0; i < sr->numnodes; i++)
174 {
175 if(id_cmp(sr->nodes[i].id, id) == 0)
176 {
177 sr->nodes[i].request_time = 0;
178 sr->nodes[i].reply_time = nowTime.tv_sec;
179 sr->nodes[i].acked = 1;
180 sr->nodes[i].pinged = 0;
181 break;
182 }
183 }
184 / See comment for gp above. /
185 search_send_get_peers(sr, NULL);
186 }
187 }
188 else
189 {
190 _dout(Unexpected reply: );
191 debug_printable((const unsigned char )buf, buflen);
192 _dout(\n);
193 }
194 break;
195 case PING:
196 _dout(Ping (%d)!From IP:%s:%d\n, tid_len,inet_ntoa(tempip->sin_addr),tempip->sin_port);
197 new_node(id, Addr, len, 1);
198 _dout(Sending pong.\n);
199 send_pong(Addr, len, tid, tid_len);
200 break;
201 case FIND_NODE:
202 _dout(Find node!From IP:%s:%d\n,inet_ntoa(tempip->sin_addr),tempip->sin_port);
203 new_node(id, Addr, len, 1);
204 _dout(Sending closest nodes (%d).\n, want);
205 send_closest_nodes(Addr, len,tid, tid_len, target, want,0, NULL, NULL, 0);
206 break;
207 case GET_PEERS:
208 _dout(Get_peers!From IP:%s:%d\n,inet_ntoa(tempip->sin_addr),tempip->sin_port);
209 new_node(id, Addr, len, 1);
210 if(id_cmp(info_hash, zeroes) == 0)
211 {
212 _dout(Eek! Got get_peers with no info_hash.\n);
213 send_error(Addr, len, tid, tid_len,203, Get_peers with no info_hash);
214 break;
215 }
216 else
217 {
218 struct storage st = find_storage(info_hash);
219 unsigned char token[TOKEN_SIZE];
220 make_token(Addr, 0, token);
221 if(st && st->numpeers > 0)
222 {
223 _dout(Sending found%s peers.\n,Addr->sa_family == AF_INET6 ? IPv6 : );
224 send_closest_nodes(Addr, len,tid, tid_len,info_hash, want,Addr->sa_family, st,token, TOKEN_SIZE);
225 }
226 else
227 {
228 _dout(Sending nodes for get_peers.\n);
229 send_closest_nodes(Addr, len,tid, tid_len, info_hash, want,0, NULL, token, TOKEN_SIZE);
230 }
231 if(callback)
232 {
233 (callback)(closure, DHT_EVENT_GET_PEER_VALUES, info_hash,(void )Addr, len);
234 }
235 }
236
237 break;
238 case ANNOUNCE_PEER:
239 _dout(Announce peer!From IP:%s:%d\n,inet_ntoa(tempip->sin_addr),tempip->sin_port);
240 new_node(id, Addr, len, 1);
241
242 if(id_cmp(info_hash, zeroes) == 0)
243 {
244 _dout(Announce_peer with no info_hash.\n);
245 send_error(Addr, len, tid, tid_len,203, Announce_peer with no info_hash);
246 break;
247 }
248 if(!token_match(token, token_len, Addr)) {
249 _dout(Incorrect token for announce_peer.\n);
250 send_error(Addr, len, tid, tid_len,203, Announce_peer with wrong token);
251 break;
252 }
253 if(port == 0) {
254 _dout(Announce_peer with forbidden port %d.\n, port);
255 send_error(Addr, len, tid, tid_len,203, Announce_peer with forbidden port number);
256 break;
257 }
258 if(callback)
259 {
260 (callback)(closure, DHT_EVENT_ANNOUNCE_PEER_VALUES, info_hash,(void )Addr, len);
261 }
262 storage_store(info_hash, Addr, port);
263 / Note that if storage_store failed, we lie to the requestor.
264 This is to prevent them backtracking, and hence polluting the DHT. /
265 _dout(Sending peer announced.\n);
266 send_peer_announced(Addr, len, tid, tid_len);
267 }
268 }
269
270 dontread:
271 if(nowTime.tv_sec >= rotate_secrets_time)
272 rotate_secrets();
273
274 if(nowTime.tv_sec >= expire_stuff_time) {
275 expire_buckets(buckets);
276 expire_buckets(buckets6);
277 expire_storage();
278 expire_searches();
279 }
280
281 if(search_time > 0 && nowTime.tv_sec >= search_time) {
282 struct search sr;
283 sr = searches;
284 while(sr) {
285 if(!sr->done && sr->step_time + 5 <= nowTime.tv_sec)
286 {
287 search_step(sr, callback, closure);
288 }
289 sr = sr->next;
290 }
291
292 search_time = 0;
293
294 sr = searches;
295 while(sr) {
296 if(!sr->done) {
297 time_t tm = sr->step_time + 15 + random() % 10;
298 if(search_time == 0 || search_time > tm)
299 search_time = tm;
300 }
301 sr = sr->next;
302 }
303 }
304
305 if(nowTime.tv_sec >= confirm_nodes_time) {
306 int soon = 0;
307
308 soon |= bucket_maintenance(AF_INET);
309 soon |= bucket_maintenance(AF_INET6);
310
311 if(!soon)
312 {
313 if(mybucket_grow_time >= nowTime.tv_sec - 150)
314 soon |= neighbourhood_maintenance(AF_INET);
315 if(mybucket6_grow_time >= nowTime.tv_sec - 150)
316 soon |= neighbourhood_maintenance(AF_INET6);
317 }
318
319 / In order to maintain all buckets age within 600 seconds, worst
320 case is roughly 27 seconds, assuming the table is 22 bits deep.
321 We want to keep a margin for neighborhood maintenance, so keep
322 this within 25 seconds. /
323 if(soon)
324 confirm_nodes_time = nowTime.tv_sec + 5 + random() % 20;
325 else
326 confirm_nodes_time = nowTime.tv_sec + 60 + random() % 120;
327 }
328
329 if(confirm_nodes_time > nowTime.tv_sec)
330 tosleep = confirm_nodes_time - nowTime.tv_sec;
331 else
332 tosleep = 0;
333
334 if(search_time > 0) {
335 if(search_time <= nowTime.tv_sec)
336 tosleep = 0;
337 else if(tosleep > search_time - nowTime.tv_sec)
338 tosleep = search_time - nowTime.tv_sec;
339 }
340
341 return 1;
342 }
dht_periodic - 至于节点如何进行桶操纵,调试过一次代码就会熟悉打听对应的道理,当然上方也介绍了如何进行桶割据的道理.
 - 接下来就是将上方的操纵步调进行轮回.
 
经由过程上方的流程,懂得DHT的工作办法后,如何增长更多的返回信息就须要下一篇的技巧性题目的介绍,大师一路批改我们的开源法度.
大师有不熟悉打听的处所,可以一路评论辩论.
别的求办事器进行法度测试,须要有固定IP,10G的WIN办事器空间,h31h31@163.com,感谢.
大师的推荐才是下一篇介绍的动力...
容易发怒的意思就是: 别人做了蠢事, 然后我们代替他们, 表现出笨蛋的样子。—— 蔡康永



