My Project 3.7.7
C++ Distributed Hash Table
Loading...
Searching...
No Matches
dht.h
1// Copyright (c) 2014-2026 Savoir-faire Linux Inc.
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include "infohash.h"
6#include "value.h"
7#include "utils.h"
8#include "network_engine.h"
9#include "scheduler.h"
10#include "routing_table.h"
11#include "callbacks.h"
12#include "dht_interface.h"
13
14#include <string>
15#include <array>
16#include <vector>
17#include <map>
18#include <functional>
19#include <memory>
20
21#ifdef _WIN32
22#include <iso646.h>
23#endif
24
25namespace dht {
26
27namespace net {
28struct Request;
29} /* namespace net */
30
31struct Storage;
32struct ValueStorage;
33class StorageBucket;
34struct Listener;
35struct LocalListener;
36
44class OPENDHT_PUBLIC Dht final : public DhtInterface
45{
46public:
51 Dht(std::unique_ptr<net::DatagramSocket>&& sock,
52 const Config& config,
53 const Sp<Logger>& l = {},
54 std::unique_ptr<std::mt19937_64>&& rd = {});
55
56 virtual ~Dht();
57
61 inline const InfoHash& getNodeId() const override { return myid; }
62 void setOnPublicAddressChanged(PublicAddressChangedCb cb) override { publicAddressChangedCb_ = std::move(cb); }
63
64 NodeStatus updateStatus(sa_family_t af) override;
65
69 NodeStatus getStatus(sa_family_t af) const override { return dht(af).status; }
70
71 NodeStatus getStatus() const override { return std::max(getStatus(AF_INET), getStatus(AF_INET6)); }
72
73 net::DatagramSocket* getSocket() const override { return network_engine.getSocket(); };
74
78 void shutdown(ShutdownCallback cb, bool stop = false) override;
79
86 bool isRunning(sa_family_t af = 0) const override;
87
88 virtual void registerType(const ValueType& type) override { types.registerType(type); }
89 const ValueType& getType(ValueType::Id type_id) const override { return types.getType(type_id); }
90
91 void addBootstrap(const std::string& host, const std::string& service) override
92 {
93 bootstrap_nodes.emplace_back(host, service);
94 startBootstrap();
95 }
96
97 void clearBootstrap() override { bootstrap_nodes.clear(); }
98
104 void insertNode(const InfoHash& id, const SockAddr&) override;
105 void insertNode(const NodeExport& n) override { insertNode(n.id, n.addr); }
106
107 void pingNode(SockAddr, DoneCallbackSimple&& cb = {}) override;
108
109 time_point periodic(const uint8_t* buf, size_t buflen, SockAddr, const time_point& now) override;
110 time_point periodic(
111 const uint8_t* buf, size_t buflen, const sockaddr* from, socklen_t fromlen, const time_point& now) override
112 {
113 return periodic(buf, buflen, SockAddr(from, fromlen), now);
114 }
115
126 virtual void get(
127 const InfoHash& key, GetCallback cb, DoneCallback donecb = {}, Value::Filter&& f = {}, Where&& w = {}) override;
128 virtual void get(const InfoHash& key,
129 GetCallback cb,
130 DoneCallbackSimple donecb = {},
131 Value::Filter&& f = {},
132 Where&& w = {}) override
133 {
134 get(key, cb, bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
135 }
136 virtual void get(const InfoHash& key,
137 GetCallbackSimple cb,
138 DoneCallback donecb = {},
139 Value::Filter&& f = {},
140 Where&& w = {}) override
141 {
142 get(key, bindGetCb(cb), donecb, std::forward<Value::Filter>(f), std::forward<Where>(w));
143 }
144 virtual void get(const InfoHash& key,
145 GetCallbackSimple cb,
146 DoneCallbackSimple donecb,
147 Value::Filter&& f = {},
148 Where&& w = {}) override
149 {
150 get(key, bindGetCb(cb), bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
151 }
162 virtual void query(const InfoHash& key, QueryCallback cb, DoneCallback done_cb = {}, Query&& q = {}) override;
163 virtual void query(const InfoHash& key, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query&& q = {}) override
164 {
165 query(key, cb, bindDoneCb(done_cb), std::forward<Query>(q));
166 }
167
171 std::vector<Sp<Value>> getLocal(const InfoHash& key, const Value::Filter& f = {}) const override;
172
176 Sp<Value> getLocalById(const InfoHash& key, Value::Id vid) const override;
177
184 void put(const InfoHash& key,
185 Sp<Value>,
186 DoneCallback cb = nullptr,
187 time_point created = time_point::max(),
188 bool permanent = false) override;
189 void put(const InfoHash& key,
190 const Sp<Value>& v,
191 DoneCallbackSimple cb,
192 time_point created = time_point::max(),
193 bool permanent = false) override
194 {
195 put(key, v, bindDoneCb(cb), created, permanent);
196 }
197
198 void put(const InfoHash& key,
199 Value&& v,
200 DoneCallback cb = nullptr,
201 time_point created = time_point::max(),
202 bool permanent = false) override
203 {
204 put(key, std::make_shared<Value>(std::move(v)), cb, created, permanent);
205 }
206 void put(const InfoHash& key,
207 Value&& v,
208 DoneCallbackSimple cb,
209 time_point created = time_point::max(),
210 bool permanent = false) override
211 {
212 put(key, std::forward<Value>(v), bindDoneCb(cb), created, permanent);
213 }
214
218 std::vector<Sp<Value>> getPut(const InfoHash&) const override;
219
223 Sp<Value> getPut(const InfoHash&, const Value::Id&) const override;
224
229 bool cancelPut(const InfoHash&, const Value::Id&) override;
230
238 size_t listen(const InfoHash&, ValueCallback, Value::Filter = {}, Where = {}) override;
239
240 size_t listen(const InfoHash& key, GetCallback cb, Value::Filter f = {}, Where w = {}) override
241 {
242 return listen(
243 key,
244 [cb](const std::vector<Sp<Value>>& vals, bool expired) {
245 if (not expired)
246 return cb(vals);
247 return true;
248 },
249 std::forward<Value::Filter>(f),
250 std::forward<Where>(w));
251 }
252 size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f = {}, Where w = {}) override
253 {
254 return listen(key, bindGetCb(cb), std::forward<Value::Filter>(f), std::forward<Where>(w));
255 }
256
257 bool cancelListen(const InfoHash&, size_t token) override;
258
264 void connectivityChanged(sa_family_t) override;
265 void connectivityChanged() override
266 {
267 connectivityChanged(AF_INET);
268 connectivityChanged(AF_INET6);
269 }
270
275 std::vector<NodeExport> exportNodes() const override;
276
277 std::vector<ValuesExport> exportValues() const override;
278 void importValues(const std::vector<ValuesExport>&) override;
279
280 void saveState(const std::string& path) const;
281 void loadState(const std::string& path);
282
283 NodeStats getNodesStats(sa_family_t af) const override;
284
285 std::string getStorageLog() const override;
286 std::string getStorageLog(const InfoHash&) const override;
287
288 std::string getRoutingTablesLog(sa_family_t) const override;
289 std::string getSearchesLog(sa_family_t) const override;
290 std::string getSearchLog(const InfoHash&, sa_family_t af = AF_UNSPEC) const override;
291
292 void dumpTables() const override;
293 std::vector<unsigned> getNodeMessageStats(bool in = false) override
294 {
295 return network_engine.getNodeMessageStats(in);
296 }
297
301 void setStorageLimit(size_t limit = 0) override { max_store_size = limit == 0 ? STORAGE_LIMIT_DEFAULT : limit; }
302 size_t getStorageLimit() const override { return max_store_size; }
303
307 void setLocalStorageLimit(size_t limit = 0) override
308 {
309 max_local_store_size = limit == 0 ? STORAGE_LIMIT_UNLIMITED : limit;
310 }
311 size_t getLocalStorageLimit() const override { return max_local_store_size; }
312
317 std::pair<size_t, size_t> getStoreSize() const override { return {total_store_size, total_values}; }
318
319 std::pair<size_t, size_t> getLocalStoreSize() const override;
320
321 std::vector<SockAddr> getPublicAddress(sa_family_t family = 0) override;
322
323 PushNotificationResult pushNotificationReceived(const std::map<std::string, std::string>&) override
324 {
325 return PushNotificationResult::IgnoredDisabled;
326 }
327 void resubscribe(unsigned) {}
328
329private:
330 /* When performing a search, we search for up to SEARCH_NODES closest nodes
331 to the destination, and use the additional ones to backtrack if any of
332 the target 8 turn out to be dead. */
333 static constexpr unsigned SEARCH_NODES {14};
334
335 /* The number of bad nodes is limited in order to help determine
336 * presence of connectivity changes. See
337 * https://github.com/savoirfairelinux/opendht/issues/137 for details.
338 *
339 * According to the tables, 25 is a good average value for big networks. If
340 * the network is small, normal search expiration process will handle the
341 * situation.
342 * */
343 static constexpr unsigned SEARCH_MAX_BAD_NODES {25};
344
345 /* Concurrent search nodes requested count */
346 static constexpr unsigned MAX_REQUESTED_SEARCH_NODES {4};
347
348 /* Number of listening nodes */
349 static constexpr unsigned LISTEN_NODES {4};
350
351 /* The maximum number of hashes we're willing to track. */
352 static constexpr unsigned MAX_HASHES {1024 * 1024 * 1024};
353
354 /* The maximum number of searches we keep data about. */
355 static constexpr unsigned MAX_SEARCHES {1024 * 1024};
356
357 static constexpr std::chrono::minutes MAX_STORAGE_MAINTENANCE_EXPIRE_TIME {10};
358
359 /* The time after which we consider a search to be expirable. */
360 static constexpr std::chrono::minutes SEARCH_EXPIRE_TIME {62};
361
362 /* Timeout for listen */
363 static constexpr duration LISTEN_EXPIRE_TIME {std::chrono::seconds(30)};
364 static constexpr duration LISTEN_EXPIRE_TIME_PUBLIC {std::chrono::minutes(5)};
365
366 static constexpr duration REANNOUNCE_MARGIN {std::chrono::seconds(10)};
367
368 static constexpr std::chrono::seconds BOOTSTRAP_PERIOD {10};
369
370 static constexpr size_t TOKEN_SIZE {32};
371
372 // internal structures
373 struct SearchNode;
374 struct Get;
375 struct Announce;
376 struct Search;
377
378 // prevent copy
379 Dht(const Dht&) = delete;
380 Dht& operator=(const Dht&) = delete;
381
382 std::mt19937_64 rd;
383
384 InfoHash myid {};
385
386 uint64_t secret {};
387 uint64_t oldsecret {};
388
389 // registred types
390 TypeStore types;
391
392 using SearchMap = std::map<InfoHash, Sp<Search>>;
393 using ReportedAddr = std::pair<unsigned, SockAddr>;
394
395 struct Kad
396 {
397 RoutingTable buckets {};
398 SearchMap searches {};
399 unsigned pending_pings {0};
400 NodeStatus status;
401 std::vector<ReportedAddr> reported_addr;
402
403 NodeStatus getStatus(time_point now) const;
404 NodeStats getNodesStats(time_point now, const InfoHash& myid) const;
405 };
406
407 Kad dht4 {};
408 Kad dht6 {};
409 PublicAddressChangedCb publicAddressChangedCb_ {};
410
411 std::vector<std::pair<std::string, std::string>> bootstrap_nodes {};
412 std::chrono::steady_clock::duration bootstrap_period {BOOTSTRAP_PERIOD};
413 Sp<Scheduler::Job> bootstrapJob {};
414 bool bootstrap_pending {false};
415
416 std::map<InfoHash, Storage> store;
417 std::map<SockAddr, StorageBucket, SockAddr::ipCmp> store_quota;
418 std::unique_ptr<StorageBucket> local_store_quota;
419 size_t total_values {0};
420 size_t total_store_size {0};
421 size_t max_store_keys {MAX_HASHES};
422 size_t max_store_size {STORAGE_LIMIT_DEFAULT};
423 size_t max_local_store_size {STORAGE_LIMIT_UNLIMITED};
424
425 size_t max_searches {MAX_SEARCHES};
426 size_t search_id {0};
427
428 // map a global listen token to IPv4, IPv6 specific listen tokens.
429 // 0 is the invalid token.
430 std::map<size_t, std::tuple<size_t, size_t, size_t>> listeners {};
431 size_t listener_token {1};
432
433 // timing
434 Scheduler scheduler;
435 Sp<Scheduler::Job> nextNodesConfirmation {};
436 Sp<Scheduler::Job> nextStorageMaintenance {};
437
438 net::NetworkEngine network_engine;
439
440 std::string persistPath;
441
442 // are we a bootstrap node ?
443 // note: Any running node can be used as a bootstrap node.
444 // Only nodes running only as bootstrap nodes should
445 // be put in bootstrap mode.
446 const bool is_bootstrap {false};
447 const bool maintain_storage {false};
448 const bool public_stable {false};
449
450 inline const duration& getListenExpiration() const
451 {
452 return public_stable ? LISTEN_EXPIRE_TIME_PUBLIC : LISTEN_EXPIRE_TIME;
453 }
454
455 void rotateSecrets();
456
457 Blob makeToken(const SockAddr&, bool old) const;
458 bool tokenMatch(const Blob& token, const SockAddr&) const;
459
460 void reportedAddr(const SockAddr&);
461
462 // Storage
463 void storageAddListener(const InfoHash& id, const Sp<Node>& node, size_t tid, Query&& = {}, int version = 0);
464 bool storageStore(const InfoHash& id,
465 const Sp<Value>& value,
466 time_point created,
467 const SockAddr& sa = {},
468 bool permanent = false,
469 time_point expiration = time_point::min());
470 bool storageRefresh(const InfoHash& id, Value::Id vid);
471 void expireStore();
472 void expireStorage(InfoHash h);
473 void expireStore(decltype(store)::iterator);
474
475 void storageRemoved(const InfoHash& id, Storage& st, const std::vector<Sp<Value>>& values, size_t totalSize);
476 void storageChanged(const InfoHash& id, Storage& st, const Sp<Value>&, bool newValue);
477 std::string printStorageLog(const decltype(store)::value_type&) const;
478
484 void dataPersistence(InfoHash id);
485 size_t maintainStorage(decltype(store)::value_type&, bool force = false, const DoneCallback& donecb = {});
486
487 // Buckets
488 Kad& dht(sa_family_t af) { return af == AF_INET ? dht4 : dht6; }
489 const Kad& dht(sa_family_t af) const { return af == AF_INET ? dht4 : dht6; }
490 RoutingTable& buckets(sa_family_t af) { return dht(af).buckets; }
491 const RoutingTable& buckets(sa_family_t af) const { return dht(af).buckets; }
492 Bucket* findBucket(const InfoHash& id, sa_family_t af)
493 {
494 auto& b = buckets(af);
495 auto it = b.findBucket(id);
496 return it == b.end() ? nullptr : &(*it);
497 }
498 const Bucket* findBucket(const InfoHash& id, sa_family_t af) const
499 {
500 return const_cast<Dht*>(this)->findBucket(id, af);
501 }
502
503 void expireBuckets(RoutingTable&);
504 void sendCachedPing(Bucket& b);
505 bool bucketMaintenance(RoutingTable&);
506 void dumpBucket(const Bucket& b, std::ostream& out) const;
507 void bootstrap();
508 void startBootstrap();
509 void stopBootstrap();
510
511 // Nodes
512 void onNewNode(const Sp<Node>& node, int confirm);
513 const Sp<Node> findNode(const InfoHash& id, sa_family_t af) const;
514 bool trySearchInsert(const Sp<Node>& node);
515
516 // Searches
517 inline SearchMap& searches(sa_family_t af) { return dht(af).searches; }
518 inline const SearchMap& searches(sa_family_t af) const { return dht(af).searches; }
519
524 Sp<Search> search(const InfoHash& id,
525 sa_family_t af,
526 GetCallback = {},
527 QueryCallback = {},
528 DoneCallback = {},
529 Value::Filter = {},
530 const Sp<Query>& q = {});
531
532 void announce(const InfoHash& id,
533 sa_family_t af,
534 Sp<Value> value,
535 DoneCallback callback,
536 time_point created = time_point::max(),
537 bool permanent = false);
538 size_t listenTo(const InfoHash& id, sa_family_t af, ValueCallback cb, Value::Filter f = {}, const Sp<Query>& q = {});
539
547 unsigned refill(Search& sr);
548 void expireSearches();
549
550 void confirmNodes();
551 void expire();
552
553 void onConnected();
554 void onDisconnected();
555
564 void searchNodeGetDone(const net::Request& status,
565 net::RequestAnswer&& answer,
566 std::weak_ptr<Search> ws,
567 Sp<Query> query);
568
578 void searchNodeGetExpired(const net::Request& status, bool over, std::weak_ptr<Search> ws, Sp<Query> query);
579
587 void paginate(std::weak_ptr<Search> ws, Sp<Query> query, SearchNode* n);
588
592 SearchNode* searchSendGetValues(Sp<Search> sr, SearchNode* n = nullptr, bool update = true);
593
600 void searchSendAnnounceValue(const Sp<Search>& sr, unsigned syncLevel = TARGET_NODES);
601
609 void searchStep(std::weak_ptr<Search> ws);
610
611 void searchSynchedNodeListen(const Sp<Search>&, SearchNode&);
612
613 void dumpSearch(const Search& sr, std::ostream& out) const;
614
615 bool neighbourhoodMaintenance(RoutingTable&);
616
617 void onError(Sp<net::Request> node, net::DhtProtocolException e);
618 /* when our address is reported by a distant peer. */
619 void onReportedAddr(const InfoHash& id, const SockAddr&);
620 /* when we receive a ping request */
621 net::RequestAnswer onPing(Sp<Node> node);
622 /* when we receive a "find node" request */
623 net::RequestAnswer onFindNode(Sp<Node> node, const InfoHash& hash, want_t want);
624 void onFindNodeDone(const Sp<Node>& status, net::RequestAnswer& a, Sp<Search> sr);
625 /* when we receive a "get values" request */
626 net::RequestAnswer onGetValues(Sp<Node> node, const InfoHash& hash, want_t want, const Query& q);
627 void onGetValuesDone(const Sp<Node>& status, net::RequestAnswer& a, Sp<Search>& sr, const Sp<Query>& orig_query);
628 /* when we receive a listen request */
629 net::RequestAnswer onListen(
630 Sp<Node> node, const InfoHash& hash, const Blob& token, size_t socket_id, const Query& query, int version = 0);
631 void onListenDone(const Sp<Node>& node, net::RequestAnswer& a, Sp<Search>& sr);
632 /* when we receive an announce request */
633 net::RequestAnswer onAnnounce(Sp<Node> node,
634 const InfoHash& hash,
635 const Blob& token,
636 const std::vector<Sp<Value>>& v,
637 const time_point& created);
638 net::RequestAnswer onRefresh(Sp<Node> node, const InfoHash& hash, const Blob& token, const Value::Id& vid);
639 void onAnnounceDone(const Sp<Node>& status, net::RequestAnswer& a, Sp<Search>& sr);
640};
641
642} // namespace dht
Dht(std::unique_ptr< net::DatagramSocket > &&sock, const Config &config, const Sp< Logger > &l={}, std::unique_ptr< std::mt19937_64 > &&rd={})
void insertNode(const InfoHash &id, const SockAddr &) override
NodeStatus updateStatus(sa_family_t af) override
NodeStatus getStatus(sa_family_t af) const override
Definition dht.h:69
std::pair< size_t, size_t > getStoreSize() const override
Definition dht.h:317
Sp< Value > getLocalById(const InfoHash &key, Value::Id vid) const override
std::vector< Sp< Value > > getPut(const InfoHash &) const override
std::vector< NodeExport > exportNodes() const override
size_t listen(const InfoHash &, ValueCallback, Value::Filter={}, Where={}) override
bool cancelPut(const InfoHash &, const Value::Id &) override
void put(const InfoHash &key, Sp< Value >, DoneCallback cb=nullptr, time_point created=time_point::max(), bool permanent=false) override
void connectivityChanged(sa_family_t) override
size_t listen(const InfoHash &key, GetCallback cb, Value::Filter f={}, Where w={}) override
Definition dht.h:240
bool isRunning(sa_family_t af=0) const override
Sp< Value > getPut(const InfoHash &, const Value::Id &) const override
void shutdown(ShutdownCallback cb, bool stop=false) override
virtual void get(const InfoHash &key, GetCallback cb, DoneCallback donecb={}, Value::Filter &&f={}, Where &&w={}) override
std::vector< Sp< Value > > getLocal(const InfoHash &key, const Value::Filter &f={}) const override
PushNotificationResult pushNotificationReceived(const std::map< std::string, std::string > &) override
Definition dht.h:323
void setStorageLimit(size_t limit=0) override
Definition dht.h:301
virtual void query(const InfoHash &key, QueryCallback cb, DoneCallback done_cb={}, Query &&q={}) override
const InfoHash & getNodeId() const override
Definition dht.h:61
void setLocalStorageLimit(size_t limit=0) override
Definition dht.h:307
std::vector< uint8_t > Blob
Definition utils.h:158
NodeStatus
Definition callbacks.h:24
Describes a query destined to another peer.
Definition value.h:988
Serializable dht::Value filter.
Definition value.h:850