digest
Loading...
Searching...
No Matches
thread_out.hpp
1#ifndef THREAD_OUT_HPP
2#define THREAD_OUT_HPP
3
4#include "digest/mod_minimizer.hpp"
5#include "digest/syncmer.hpp"
6#include "digest/window_minimizer.hpp"
7#include <cstdint>
8#include <future>
9#include <thread>
10#include <vector>
11
31
36class BadThreadOutParams : public std::exception {
37 const char *what() const throw() {
38 return "k must be greater than 3, start must be less than len, \
39 and num threads must be greater or equal to the number of kmers/large windows \
40 large_wind_kmer_am can't be 0";
41 }
42};
43
44//------------- WORKER FUNCTIONS ----------------
45
46// function that's passed to the thread for ModMinmizers
47template <digest::BadCharPolicy P>
48std::vector<uint32_t> thread_mod_roll1(const char *seq, size_t ind, unsigned k,
49 uint32_t mod, uint32_t congruence,
50 digest::MinimizedHashType minimized_h,
51 unsigned assigned_kmer_am) {
52 std::vector<uint32_t> out;
53 digest::ModMin<P> dig(seq, ind + assigned_kmer_am + k - 1, k, mod,
54 congruence, ind, minimized_h);
55 dig.roll_minimizer(assigned_kmer_am, out);
56 return out;
57}
58
59template <digest::BadCharPolicy P>
60std::vector<std::pair<uint32_t, uint32_t>>
61thread_mod_roll2(const char *seq, size_t ind, unsigned k, uint32_t mod,
62 uint32_t congruence, digest::MinimizedHashType minimized_h,
63 unsigned assigned_kmer_am) {
64 std::vector<std::pair<uint32_t, uint32_t>> out;
65 digest::ModMin<P> dig(seq, ind + assigned_kmer_am + k - 1, k, mod,
66 congruence, ind, minimized_h);
67 dig.roll_minimizer(assigned_kmer_am, out);
68 return out;
69}
70
71// function that's passed to the thread for WindowMinimizers
72template <digest::BadCharPolicy P, class T>
73std::vector<uint32_t> thread_wind_roll1(const char *seq, size_t ind, unsigned k,
74 uint32_t large_wind_kmer_am,
75 digest::MinimizedHashType minimized_h,
76 unsigned assigned_lwind_am) {
77 std::vector<uint32_t> out;
79 seq, ind + assigned_lwind_am + k + large_wind_kmer_am - 1 - 1, k,
80 large_wind_kmer_am, ind, minimized_h);
81 dig.roll_minimizer(assigned_lwind_am, out);
82 return out;
83}
84
85template <digest::BadCharPolicy P, class T>
86std::vector<std::pair<uint32_t, uint32_t>> thread_wind_roll2(
87 const char *seq, size_t ind, unsigned k, uint32_t large_wind_kmer_am,
88 digest::MinimizedHashType minimized_h, unsigned assigned_lwind_am) {
89 std::vector<std::pair<uint32_t, uint32_t>> out;
91 seq, ind + assigned_lwind_am + k + large_wind_kmer_am - 1 - 1, k,
92 large_wind_kmer_am, ind, minimized_h);
93 dig.roll_minimizer(assigned_lwind_am, out);
94 return out;
95}
96
97// function that's passed to the thread for Syncmers
98template <digest::BadCharPolicy P, class T>
99std::vector<uint32_t> thread_sync_roll1(const char *seq, size_t ind, unsigned k,
100 uint32_t large_wind_kmer_am,
101 digest::MinimizedHashType minimized_h,
102 unsigned assigned_lwind_am) {
103 std::vector<uint32_t> out;
105 seq, ind + assigned_lwind_am + k + large_wind_kmer_am - 1 - 1, k,
106 large_wind_kmer_am, ind, minimized_h);
107 dig.roll_minimizer(assigned_lwind_am, out);
108 return out;
109}
110
111template <digest::BadCharPolicy P, class T>
112std::vector<std::pair<uint32_t, uint32_t>> thread_sync_roll2(
113 const char *seq, size_t ind, unsigned k, uint32_t large_wind_kmer_am,
114 digest::MinimizedHashType minimized_h, unsigned assigned_lwind_am) {
115 std::vector<std::pair<uint32_t, uint32_t>> out;
117 seq, ind + assigned_lwind_am + k + large_wind_kmer_am - 1 - 1, k,
118 large_wind_kmer_am, ind, minimized_h);
119 dig.roll_minimizer(assigned_lwind_am, out);
120 return out;
121}
122
143template <digest::BadCharPolicy P>
145 unsigned thread_count, std::vector<std::vector<uint32_t>> &vec,
146 const char *seq, size_t len, unsigned k, uint32_t mod,
147 uint32_t congruence = 0, size_t start = 0,
149 int num_kmers = (int)len - (int)start - (int)k + 1;
150 if (k < 4 || start >= len || num_kmers < 0 ||
151 (unsigned)num_kmers < thread_count) {
152 throw BadThreadOutParams();
153 }
154 unsigned kmers_per_thread = num_kmers / thread_count;
155 unsigned extras = num_kmers % thread_count;
156 vec.reserve(thread_count);
157 std::vector<std::future<std::vector<uint32_t>>> thread_vector;
158
159 size_t ind = start;
160 for (unsigned i = 0; i < thread_count; i++) {
161 // issue is here
162 // this will lead to a leak
163 unsigned assigned_kmer_am = kmers_per_thread;
164 if (extras > 0) {
165 ++(assigned_kmer_am);
166 extras--;
167 }
168
169 thread_vector.emplace_back(std::async(thread_mod_roll1<P>, seq, ind, k,
170 mod, congruence, minimized_h,
171 assigned_kmer_am));
172
173 ind += assigned_kmer_am;
174 }
175 for (auto &t : thread_vector) {
176 vec.emplace_back(t.get());
177 }
178}
179
186template <digest::BadCharPolicy P>
188 unsigned thread_count, std::vector<std::vector<uint32_t>> &vec,
189 const std::string &seq, unsigned k, uint32_t mod, uint32_t congruence = 0,
190 size_t start = 0,
192 thread_mod<P>(thread_count, vec, seq.c_str(), seq.size(), k, mod,
193 congruence, start, minimized_h);
194}
195
203template <digest::BadCharPolicy P>
205 unsigned thread_count,
206 std::vector<std::vector<std::pair<uint32_t, uint32_t>>> &vec,
207 const char *seq, size_t len, unsigned k, uint32_t mod,
208 uint32_t congruence = 0, size_t start = 0,
210 int num_kmers = (int)len - (int)start - (int)k + 1;
211 if (k < 4 || start >= len || num_kmers < 0 ||
212 (unsigned)num_kmers < thread_count) {
213 throw BadThreadOutParams();
214 }
215 unsigned kmers_per_thread = num_kmers / thread_count;
216 unsigned extras = num_kmers % thread_count;
217 vec.reserve(thread_count);
218 std::vector<std::future<std::vector<std::pair<uint32_t, uint32_t>>>>
219 thread_vector;
220
221 size_t ind = start;
222 for (unsigned i = 0; i < thread_count; i++) {
223 // issue is here
224 // this will lead to a leak
225 unsigned assigned_kmer_am = kmers_per_thread;
226 if (extras > 0) {
227 ++(assigned_kmer_am);
228 extras--;
229 }
230
231 thread_vector.emplace_back(std::async(thread_mod_roll2<P>, seq, ind, k,
232 mod, congruence, minimized_h,
233 assigned_kmer_am));
234
235 ind += assigned_kmer_am;
236 }
237 for (auto &t : thread_vector) {
238 vec.emplace_back(t.get());
239 }
240}
241
249template <digest::BadCharPolicy P>
251 unsigned thread_count,
252 std::vector<std::vector<std::pair<uint32_t, uint32_t>>> &vec,
253 const std::string &seq, unsigned k, uint32_t mod, uint32_t congruence = 0,
254 size_t start = 0,
256 thread_mod<P>(thread_count, vec, seq.c_str(), seq.size(), k, mod,
257 congruence, start, minimized_h);
258}
259
282template <digest::BadCharPolicy P, class T>
284 unsigned thread_count, std::vector<std::vector<uint32_t>> &vec,
285 const char *seq, size_t len, unsigned k, uint32_t large_wind_kmer_am,
286 size_t start = 0,
288 int num_lwinds = (int)len - (int)start - (int)(k + large_wind_kmer_am) + 2;
289 if (large_wind_kmer_am == 0 || k < 4 || start >= len || num_lwinds < 0 ||
290 (unsigned)num_lwinds < thread_count) {
291 throw BadThreadOutParams();
292 }
293 unsigned lwinds_per_thread = num_lwinds / thread_count;
294 unsigned extras = num_lwinds % thread_count;
295 vec.reserve(thread_count);
296 std::vector<std::future<std::vector<uint32_t>>> thread_vector;
297
298 size_t ind = start;
299 for (unsigned i = 0; i < thread_count; i++) {
300 // issue is here
301 // this will lead to a leak
302 unsigned assigned_lwind_am = lwinds_per_thread;
303 if (extras > 0) {
304 ++(assigned_lwind_am);
305 extras--;
306 }
307
308 thread_vector.emplace_back(std::async(thread_wind_roll1<P, T>, seq, ind,
309 k, large_wind_kmer_am,
310 minimized_h, assigned_lwind_am));
311
312 ind += assigned_lwind_am;
313 }
314 for (auto &t : thread_vector) {
315 vec.emplace_back(t.get());
316 }
317
318 // handle duplicates
319 // the only possible place for a duplicate is for the last element
320 // of vec[i] to equal the first value of vec[i+1] due to the fact
321 // that thread_i+1 can't know the last minimizer of thread_i
322 for (unsigned i = 0; i < thread_count - 1; i++) {
323 int last = (int)vec[i].size() - 1;
324 if (vec[i][last] == vec[i + 1][0]) {
325 vec[i].pop_back();
326 }
327 }
328}
329
336template <digest::BadCharPolicy P, class T>
338 unsigned thread_count, std::vector<std::vector<uint32_t>> &vec,
339 const std::string &seq, unsigned k, uint32_t large_wind_kmer_am,
340 size_t start = 0,
342 thread_wind<P, T>(thread_count, vec, seq.c_str(), seq.size(), k,
343 large_wind_kmer_am, start, minimized_h);
344}
345
353template <digest::BadCharPolicy P, class T>
355 unsigned thread_count,
356 std::vector<std::vector<std::pair<uint32_t, uint32_t>>> &vec,
357 const char *seq, size_t len, unsigned k, uint32_t large_wind_kmer_am,
358 size_t start = 0,
360 int num_lwinds = (int)len - (int)start - (int)(k + large_wind_kmer_am) + 2;
361 if (large_wind_kmer_am == 0 || k < 4 || start >= len || num_lwinds < 0 ||
362 (unsigned)num_lwinds < thread_count) {
363 throw BadThreadOutParams();
364 }
365 unsigned lwinds_per_thread = num_lwinds / thread_count;
366 unsigned extras = num_lwinds % thread_count;
367 vec.reserve(thread_count);
368 std::vector<std::future<std::pair<uint32_t, uint32_t>>> thread_vector;
369
370 size_t ind = start;
371 for (unsigned i = 0; i < thread_count; i++) {
372 // issue is here
373 // this will lead to a leak
374 unsigned assigned_lwind_am = lwinds_per_thread;
375 if (extras > 0) {
376 ++(assigned_lwind_am);
377 extras--;
378 }
379
380 thread_vector.emplace_back(std::async(thread_wind_roll2<P, T>, seq, ind,
381 k, large_wind_kmer_am,
382 minimized_h, assigned_lwind_am));
383
384 ind += assigned_lwind_am;
385 }
386 for (auto &t : thread_vector) {
387 vec.emplace_back(t.get());
388 }
389
390 // handle duplicates
391 // the only possible place for a duplicate is for the last element
392 // of vec[i] to equal the first value of vec[i+1] due to the fact
393 // that thread_i+1 can't know the last minimizer of thread_i
394 for (unsigned i = 0; i < thread_count - 1; i++) {
395 int last = (int)vec[i].size() - 1;
396 if (vec[i][last] == vec[i + 1][0]) {
397 vec[i].pop_back();
398 }
399 }
400}
401
409template <digest::BadCharPolicy P, class T>
411 unsigned thread_count,
412 std::vector<std::vector<std::pair<uint32_t, uint32_t>>> &vec,
413 const std::string &seq, unsigned k, uint32_t large_wind_kmer_am,
414 size_t start = 0,
416 thread_wind<P, T>(thread_count, vec, seq.c_str(), seq.size(), k,
417 large_wind_kmer_am, start, minimized_h);
418}
419
442template <digest::BadCharPolicy P, class T>
444 unsigned thread_count, std::vector<std::vector<uint32_t>> &vec,
445 const char *seq, size_t len, unsigned k, uint32_t large_wind_kmer_am,
446 size_t start = 0,
448 int num_lwinds = (int)len - (int)start - (int)(k + large_wind_kmer_am) + 2;
449 if (large_wind_kmer_am == 0 || k < 4 || start >= len || num_lwinds < 0 ||
450 (unsigned)num_lwinds < thread_count) {
451 throw BadThreadOutParams();
452 }
453 unsigned lwinds_per_thread = num_lwinds / thread_count;
454 unsigned extras = num_lwinds % thread_count;
455 vec.reserve(thread_count);
456 std::vector<std::future<std::vector<uint32_t>>> thread_vector;
457
458 size_t ind = start;
459 for (unsigned i = 0; i < thread_count; i++) {
460 // issue is here
461 // this will lead to a leak
462 unsigned assigned_lwind_am = lwinds_per_thread;
463 if (extras > 0) {
464 ++(assigned_lwind_am);
465 extras--;
466 }
467
468 thread_vector.emplace_back(std::async(thread_sync_roll1<P, T>, seq, ind,
469 k, large_wind_kmer_am,
470 minimized_h, assigned_lwind_am));
471
472 ind += assigned_lwind_am;
473 }
474 for (auto &t : thread_vector) {
475
476 vec.emplace_back(t.get());
477 }
478}
479
486template <digest::BadCharPolicy P, class T>
488 unsigned thread_count, std::vector<std::vector<uint32_t>> &vec,
489 const std::string &seq, unsigned k, uint32_t large_wind_kmer_am,
490 size_t start = 0,
492 thread_sync<P, T>(thread_count, vec, seq.c_str(), seq.size(), k,
493 large_wind_kmer_am, start, minimized_h);
494}
495
503template <digest::BadCharPolicy P, class T>
505 unsigned thread_count,
506 std::vector<std::vector<std::pair<uint32_t, uint32_t>>> &vec,
507 const char *seq, size_t len, unsigned k, uint32_t large_wind_kmer_am,
508 size_t start = 0,
510 int num_lwinds = (int)len - (int)start - (int)(k + large_wind_kmer_am) + 2;
511 if (large_wind_kmer_am == 0 || k < 4 || start >= len || num_lwinds < 0 ||
512 (unsigned)num_lwinds < thread_count) {
513 throw BadThreadOutParams();
514 }
515 unsigned lwinds_per_thread = num_lwinds / thread_count;
516 unsigned extras = num_lwinds % thread_count;
517 vec.reserve(thread_count);
518 std::vector<std::future<std::vector<std::pair<uint32_t, uint32_t>>>>
519 thread_vector;
520
521 size_t ind = start;
522 for (unsigned i = 0; i < thread_count; i++) {
523 // issue is here
524 // this will lead to a leak
525 unsigned assigned_lwind_am = lwinds_per_thread;
526 if (extras > 0) {
527 ++(assigned_lwind_am);
528 extras--;
529 }
530
531 thread_vector.emplace_back(std::async(thread_sync_roll2<P, T>, seq, ind,
532 k, large_wind_kmer_am,
533 minimized_h, assigned_lwind_am));
534
535 ind += assigned_lwind_am;
536 }
537 for (auto &t : thread_vector) {
538 vec.emplace_back(t.get());
539 }
540}
541
549template <digest::BadCharPolicy P, class T>
551 unsigned thread_count,
552 std::vector<std::vector<std::pair<uint32_t, uint32_t>>> &vec,
553 const std::string &seq, unsigned k, uint32_t large_wind_kmer_am,
554 size_t start = 0,
556 thread_sync<P, T>(thread_count, vec, seq.c_str(), seq.size(), k,
557 large_wind_kmer_am, start, minimized_h);
558}
559
560} // namespace digest::thread_out
561
562#endif // THREAD_OUT_HPP
Child class of Digester that defines a minimizer as a kmer whose hash is equal to some target value a...
Definition mod_minimizer.hpp:30
This class inherits from WindowMinimizer (implementation reasons), but the represent very different t...
Definition syncmer.hpp:21
Child class of Digester that defines a minimizer as a kmer whose hash is minimal among those in the l...
Definition window_minimizer.hpp:33
Exception thrown when invalid parameters are passed to the thread functions.
Definition thread_out.hpp:36
Possible implementation for multi-threading the digestion of a single sequence. The key thing to note...
Definition thread_out.hpp:30
void thread_mod(unsigned thread_count, std::vector< std::vector< uint32_t > > &vec, const char *seq, size_t len, unsigned k, uint32_t mod, uint32_t congruence=0, size_t start=0, digest::MinimizedHashType minimized_h=digest::MinimizedHashType::CANON)
Definition thread_out.hpp:144
void thread_wind(unsigned thread_count, std::vector< std::vector< uint32_t > > &vec, const char *seq, size_t len, unsigned k, uint32_t large_wind_kmer_am, size_t start=0, digest::MinimizedHashType minimized_h=digest::MinimizedHashType::CANON)
Definition thread_out.hpp:283
void thread_sync(unsigned thread_count, std::vector< std::vector< uint32_t > > &vec, const char *seq, size_t len, unsigned k, uint32_t large_wind_kmer_am, size_t start=0, digest::MinimizedHashType minimized_h=digest::MinimizedHashType::CANON)
Definition thread_out.hpp:443
MinimizedHashType
Enum values for the type of hash to minimize.
Definition digester.hpp:41