TTK
Loading...
Searching...
No Matches
psort.h
Go to the documentation of this file.
1
12
17
18/*
19Copyright (c) 2009, David Cheng, Viral B. Shah.
20
21Permission is hereby granted, free of charge, to any person obtaining a copy
22of this software and associated documentation files (the "Software"), to deal
23in the Software without restriction, including without limitation the rights
24to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
25copies of the Software, and to permit persons to whom the Software is
26furnished to do so, subject to the following conditions:
27
28The above copyright notice and this permission notice shall be included in
29all copies or substantial portions of the Software.
30
31THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
32IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
33FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
34AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
35LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
36OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
37THE SOFTWARE.
38*/
39
40#pragma once
41#include <Debug.h>
42
43#ifdef TTK_ENABLE_MPI
44namespace p_sort {
45
46 template <typename T, class _Compare>
47 class PermCompare {
48 private:
49 T *weights;
50 _Compare comp;
51
52 public:
53 PermCompare(T *w, _Compare c) : weights(w), comp(c) {
54 }
55 bool operator()(int a, int b) {
56 return comp(weights[a], weights[b]);
57 }
58 };
59
60 template <class _Compare, typename _RandomAccessIter>
61 void psort_split(_RandomAccessIter first,
62 _RandomAccessIter last,
63 ttk::SimplexId *dist,
64 _Compare comp,
65 std::vector<std::vector<ttk::SimplexId>> &right_ends,
66 MPI_Datatype &MPI_valueType,
67 MPI_Datatype &MPI_distanceType,
68 int nThreads,
69 int localSize,
70 int localRank,
71 MPI_Comm &localComm) {
72
73 using dataType =
74 typename std::iterator_traits<_RandomAccessIter>::value_type;
75
76 int n_real = localSize;
77 for(int i = 0; i < localSize; ++i)
78 if(dist[i] == 0) {
79 n_real = i;
80 break;
81 }
82
83 std::copy(dist, dist + localSize, right_ends[localSize].begin());
84
85 // union of [0, right_end[i+1]) on each processor produces dist[i] total
86 // values
87 // ttk::SimplexId targets[localSize-1];
88 std::vector<ttk::SimplexId> targets(localSize - 1);
89 std::partial_sum(dist, dist + (localSize - 1), targets.data());
90
91 // keep a list of ranges, trying to "activate" them at each branch
92 std::vector<std::pair<_RandomAccessIter, _RandomAccessIter>> d_ranges(
93 localSize - 1);
94 std::vector<std::pair<ttk::SimplexId *, ttk::SimplexId *>> t_ranges(
95 localSize - 1);
96 d_ranges[0] = std::make_pair(first, last);
97 t_ranges[0]
98 = std::make_pair(targets.data(), targets.data() + localSize - 1);
99
100 std::vector<std::vector<ttk::SimplexId>> subdist(
101 localSize - 1, std::vector<ttk::SimplexId>(localSize));
102 std::copy(dist, dist + localSize, subdist[0].begin());
103
104 // for each processor, d_ranges - first
105 std::vector<std::vector<ttk::SimplexId>> outleft(
106 localSize - 1, std::vector<ttk::SimplexId>(localSize, 0));
107
108 for(int n_act = 1; n_act > 0;) {
109 // std::cout << " n_act: "<< n_act<< std::endl;
110 for(int k = 0; k < n_act; ++k) {
111 assert(subdist[k][localRank] == d_ranges[k].second - d_ranges[k].first);
112 }
113
114 //------- generate n_act guesses
115
116 // for the allgather, make a flat array of localSize chunks, each with
117 // n_act elts
118 std::vector<dataType> medians(localSize * n_act);
119 for(int k = 0; k < n_act; ++k) {
120 if(d_ranges[k].first != last) {
121 dataType *ptr = &(*d_ranges[k].first);
122 ttk::SimplexId index = subdist[k][localRank] / 2;
123 medians[localRank * n_act + k] = ptr[index];
124 } else {
125 medians[localRank * n_act + k] = *(last - 1);
126 }
127 }
128 MPI_Allgather(MPI_IN_PLACE, n_act, MPI_valueType, &medians[0], n_act,
129 MPI_valueType, localComm);
130
131 // compute the weighted median of medians
132 std::vector<dataType> queries(n_act);
133
134 std::vector<ttk::SimplexId> ms_perm(n_real);
135 for(int k = 0; k < n_act; ++k) {
136
137 for(int i = 0; i < n_real; ++i)
138 ms_perm[i] = i * n_act + k;
139 TTK_PSORT(nThreads, ms_perm.data(), ms_perm.data() + n_real,
140 PermCompare<dataType, _Compare>(&medians[0], comp));
141
143 = std::accumulate(
144 subdist[k].begin(), subdist[k].end(), (ttk::SimplexId)0)
145 / 2;
146 ttk::SimplexId query_ind = -1;
147 for(int i = 0; i < n_real; ++i) {
148 if(subdist[k][ms_perm[i] / n_act] == 0)
149 continue;
150
151 mid -= subdist[k][ms_perm[i] / n_act];
152 if(mid <= 0) {
153 query_ind = ms_perm[i];
154 break;
155 }
156 }
157
158 assert(query_ind >= 0);
159 queries[k] = medians[query_ind];
160 }
161 //------- find min and max ranks of the guesses
162 std::vector<ttk::SimplexId> ind_local(2 * n_act);
163
164 for(int k = 0; k < n_act; ++k) {
165 std::pair<_RandomAccessIter, _RandomAccessIter> ind_local_p
166 = equal_range(
167 d_ranges[k].first, d_ranges[k].second, queries[k], comp);
168
169 ind_local[2 * k] = ind_local_p.first - first;
170 ind_local[2 * k + 1] = ind_local_p.second - first;
171 }
172
173 std::vector<ttk::SimplexId> ind_all(2 * n_act * localSize);
174 MPI_Allgather(ind_local.data(), 2 * n_act, MPI_distanceType,
175 ind_all.data(), 2 * n_act, MPI_distanceType, localComm);
176 // sum to get the global range of indices
177 std::vector<std::pair<ttk::SimplexId, ttk::SimplexId>> ind_global(n_act);
178 for(int k = 0; k < n_act; ++k) {
179 ind_global[k] = std::make_pair(0, 0);
180 for(int i = 0; i < localSize; ++i) {
181 ind_global[k].first += ind_all[2 * (i * n_act + k)];
182 ind_global[k].second += ind_all[2 * (i * n_act + k) + 1];
183 }
184 }
185
186 // state to pass on to next iteration
187 std::vector<std::pair<_RandomAccessIter, _RandomAccessIter>> d_ranges_x(
188 localSize - 1);
189 std::vector<std::pair<ttk::SimplexId *, ttk::SimplexId *>> t_ranges_x(
190 localSize - 1);
191 std::vector<std::vector<ttk::SimplexId>> subdist_x(
192 localSize - 1, std::vector<ttk::SimplexId>(localSize));
193 std::vector<std::vector<ttk::SimplexId>> outleft_x(
194 localSize - 1, std::vector<ttk::SimplexId>(localSize, 0));
195 int n_act_x = 0;
196
197 for(int k = 0; k < n_act; ++k) {
198 ttk::SimplexId *split_low = std::lower_bound(
199 t_ranges[k].first, t_ranges[k].second, ind_global[k].first);
200 ttk::SimplexId *split_high = std::upper_bound(
201 t_ranges[k].first, t_ranges[k].second, ind_global[k].second);
202
203 // iterate over targets we hit
204 for(ttk::SimplexId *s = split_low; s != split_high; ++s) {
205 assert(*s > 0);
206 // a bit sloppy: if more than one target in range, excess won't zero
207 // out
208 ttk::SimplexId excess = *s - ind_global[k].first;
209 // low procs to high take excess for stability
210 for(int i = 0; i < localSize; ++i) {
211 ttk::SimplexId amount
212 = std::min(ind_all[2 * (i * n_act + k)] + excess,
213 ind_all[2 * (i * n_act + k) + 1]);
214 right_ends[(s - targets.data()) + 1][i] = amount;
215 excess -= amount - ind_all[2 * (i * n_act + k)];
216 }
217 }
218
219 if((split_low - t_ranges[k].first) > 0) {
220 t_ranges_x[n_act_x] = std::make_pair(t_ranges[k].first, split_low);
221 // lop off local_ind_low..end
222 d_ranges_x[n_act_x]
223 = std::make_pair(d_ranges[k].first, first + ind_local[2 * k]);
224 for(int i = 0; i < localSize; ++i) {
225 subdist_x[n_act_x][i]
226 = ind_all[2 * (i * n_act + k)] - outleft[k][i];
227 outleft_x[n_act_x][i] = outleft[k][i];
228 }
229 ++n_act_x;
230 }
231
232 if((t_ranges[k].second - split_high) > 0) {
233 t_ranges_x[n_act_x] = std::make_pair(split_high, t_ranges[k].second);
234 // lop off begin..local_ind_high
235 d_ranges_x[n_act_x]
236 = std::make_pair(first + ind_local[2 * k + 1], d_ranges[k].second);
237 for(int i = 0; i < localSize; ++i) {
238 subdist_x[n_act_x][i] = outleft[k][i] + subdist[k][i]
239 - ind_all[2 * (i * n_act + k) + 1];
240 outleft_x[n_act_x][i] = ind_all[2 * (i * n_act + k) + 1];
241 }
242 ++n_act_x;
243 }
244 }
245
246 t_ranges = t_ranges_x;
247 d_ranges = d_ranges_x;
248 subdist = subdist_x;
249 outleft = outleft_x;
250 n_act = n_act_x;
251 }
252 TTK_FORCE_USE(nThreads);
253 }
254
260 template <typename dataType>
261 static void alltoall(std::vector<std::vector<ttk::SimplexId>> &right_ends,
262 std::vector<dataType> &data,
263 std::vector<dataType> &trans_data,
264 ttk::SimplexId *boundaries,
265 MPI_Datatype &MPI_valueType,
266 MPI_Datatype &MPI_distanceType,
267 int localSize,
268 int localRank,
269 MPI_Comm &localComm) {
270 // MPI_Alltoallv requires integers for displacements and counts.
271 // If there is a need to send more, then a second strategy is used
272 ttk::SimplexId n_loc_ = data.size();
273 // char and not boolean because MPI doesn't know boolean
274 char overflowInt{0};
275 ttk::SimplexId chunkSize = n_loc_ / INT_MAX + 1;
276 if(n_loc_ > INT_MAX) {
277 overflowInt = true;
278 }
279 int n_loc = static_cast<int>(n_loc_);
280 // Calculate the counts for redistributing data and detects potential
281 // overflows of INT_MAX
282 std::vector<ttk::SimplexId> send_counts(localSize);
283 std::vector<ttk::SimplexId> recv_counts(localSize);
284#pragma omp parallel for reduction(+ : overflowInt)
285 for(int i = 0; i < localSize; ++i) {
286 ttk::SimplexId scount
287 = right_ends[i + 1][localRank] - right_ends[i][localRank];
288 ttk::SimplexId rcount
289 = right_ends[localRank + 1][i] - right_ends[localRank][i];
290 if(scount > INT_MAX || rcount > INT_MAX) {
291 overflowInt = 1;
292 }
293 send_counts[i] = scount;
294 recv_counts[i] = rcount;
295 }
296 MPI_Allreduce(MPI_IN_PLACE, &overflowInt, 1, MPI_CHAR, MPI_SUM, localComm);
297 MPI_Allreduce(
298 MPI_IN_PLACE, &chunkSize, 1, MPI_distanceType, MPI_MAX, localComm);
299 if(overflowInt == 0) {
300 std::vector<int> send_counts_int(localSize);
301 std::vector<int> send_disps_int(localSize);
302 std::vector<int> recv_counts_int(localSize);
303 std::vector<int> recv_disps_int(localSize);
304
305 for(int i = 0; i < localSize; i++) {
306 send_counts_int[i] = static_cast<int>(send_counts[i]);
307 recv_counts_int[i] = static_cast<int>(recv_counts[i]);
308 }
309
310 recv_disps_int[0] = 0;
311 std::partial_sum(recv_counts_int.data(),
312 recv_counts_int.data() + localSize - 1,
313 recv_disps_int.data() + 1);
314
315 send_disps_int[0] = 0;
316 std::partial_sum(send_counts_int.data(),
317 send_counts_int.data() + localSize - 1,
318 send_disps_int.data() + 1);
319 // Do the transpose
320 MPI_Alltoallv(data.data(), send_counts_int.data(), send_disps_int.data(),
321 MPI_valueType, trans_data.data(), recv_counts_int.data(),
322 recv_disps_int.data(), MPI_valueType, localComm);
323
324 for(int i = 0; i < localSize; ++i)
325 boundaries[i] = (ttk::SimplexId)recv_disps_int[i];
326 boundaries[localSize] = (ttk::SimplexId)n_loc; // for the merging
327
328 return;
329 }
330
331 // if the alltoall needs to send more than INT_MAX elements,
332 // then multiple messages, of each at most INT_MAX elements,
333 // are sent until all elements are sent.
334 std::vector<ttk::SimplexId> send_disps(localSize);
335 std::vector<ttk::SimplexId> recv_disps(localSize);
336 send_disps[0] = 0;
337 std::partial_sum(send_counts.data(), send_counts.data() + localSize - 1,
338 send_disps.data() + 1);
339 recv_disps[0] = 0;
340 std::partial_sum(recv_counts.data(), recv_counts.data() + localSize - 1,
341 recv_disps.data() + 1);
342
343 int moreToSend{1};
344 int count{0};
345 std::vector<int> partial_recv_count(localSize, 0);
346 std::vector<int> partial_send_count(localSize, 0);
347 std::vector<int> partial_recv_displs(localSize, 0);
348 std::vector<int> partial_send_displs(localSize, 0);
349 std::vector<dataType> send_buffer_64bits(INT_MAX);
350 std::vector<dataType> recv_buffer_64bits(INT_MAX);
351 ttk::SimplexId messageSize = std::max(INT_MAX / localSize - 1, 1);
352 while(moreToSend) {
353 // Preparing the send buffer of at most INT_MAX elements
354 send_buffer_64bits.resize(0);
355 moreToSend = 0;
356 for(int i = 0; i < localSize; i++) {
357 if(i > 0) {
358 partial_send_displs[i]
359 = partial_send_displs[i - 1] + partial_send_count[i - 1];
360 partial_recv_displs[i]
361 = partial_recv_displs[i - 1] + partial_recv_count[i - 1];
362 }
363 if(send_counts[i] - count * messageSize > 0) {
364 moreToSend = 1;
365 if(send_counts[i] - count * messageSize > messageSize) {
366 partial_send_count[i] = messageSize;
367 } else {
368 partial_send_count[i] = send_counts[i] - count * messageSize;
369 }
370 std::copy(data.begin() + send_disps[i] + count * messageSize,
371 data.begin() + send_disps[i] + count * messageSize
372 + partial_send_count[i],
373 send_buffer_64bits.begin() + partial_send_displs[i]);
374 } else {
375 partial_send_count[i] = 0;
376 }
377 if(recv_counts[i] - count * messageSize > 0) {
378 if(recv_counts[i] - count * messageSize > messageSize) {
379 partial_recv_count[i] = messageSize;
380 } else {
381 partial_recv_count[i] = recv_counts[i] - count * messageSize;
382 }
383 } else {
384 partial_recv_count[i] = 0;
385 }
386 }
387 MPI_Alltoallv(send_buffer_64bits.data(), partial_send_count.data(),
388 partial_send_displs.data(), MPI_valueType,
389 recv_buffer_64bits.data(), partial_recv_count.data(),
390 partial_recv_displs.data(), MPI_valueType, localComm);
391 // Receiving the buffer and placing it in the right vector
392 for(int i = 0; i < localSize; i++) {
393 if(partial_recv_count[i] > 0) {
394 std::copy(recv_buffer_64bits.begin() + partial_recv_displs[i],
395 recv_buffer_64bits.begin() + partial_recv_displs[i]
396 + partial_recv_count[i],
397 trans_data.begin() + recv_disps[i] + count * messageSize);
398 }
399 }
400 count++;
401 // Test to see if more messages need to be sent
402 MPI_Allreduce(
403 MPI_IN_PLACE, &moreToSend, 1, MPI_INTEGER, MPI_SUM, localComm);
404 }
405
406 for(int i = 0; i < localSize; ++i)
407 boundaries[i] = recv_disps[i];
408 boundaries[localSize] = n_loc_; // for the merging
409
410 return;
411 }
412
413 template <class _Compare, typename _RandomAccessIter>
414 void psort_merge(_RandomAccessIter in,
415 _RandomAccessIter out,
416 ttk::SimplexId *disps,
417 _Compare comp,
418 _Compare oppositeComp,
419 int localSize) {
420
421 if(localSize == 1) {
422 std::copy(in, in + disps[localSize], out);
423 return;
424 }
425
426 _RandomAccessIter bufs[2] = {in, out};
427
428 std::vector<ttk::SimplexId> locs(localSize, 0);
429
430 ttk::SimplexId next = 1;
431 while(true) {
432 ttk::SimplexId stride = next * 2;
433 if(stride >= localSize)
434 break;
435
436 for(ttk::SimplexId i = 0; i + next < localSize; i += stride) {
437 ttk::SimplexId end_ind
438 = std::min(i + stride, (ttk::SimplexId)localSize);
439
440 std::merge(bufs[locs[i]] + disps[i], bufs[locs[i]] + disps[i + next],
441 bufs[locs[i + next]] + disps[i + next],
442 bufs[locs[i + next]] + disps[end_ind],
443 bufs[1 - locs[i]] + disps[i], comp);
444 locs[i] = 1 - locs[i];
445 }
446
447 next = stride;
448 }
449
450 // now have 4 cases for final merge
451 if(locs[0] == 0) {
452 // 00, 01 => out of place
453 std::merge(in, in + disps[next], bufs[locs[next]] + disps[next],
454 bufs[locs[next]] + disps[localSize], out, comp);
455 } else if(locs[next] == 0) {
456 // 10 => backwards out of place
457 std::merge(
458 std::reverse_iterator<_RandomAccessIter>(in + disps[localSize]),
459 std::reverse_iterator<_RandomAccessIter>(in + disps[next]),
460 std::reverse_iterator<_RandomAccessIter>(out + disps[next]),
461 std::reverse_iterator<_RandomAccessIter>(out),
462 std::reverse_iterator<_RandomAccessIter>(out + disps[localSize]),
463 oppositeComp);
464 } else {
465 // 11 => in-place
466 std::inplace_merge(out, out + disps[next], out + disps[localSize], comp);
467 }
468 }
469
486 template <typename dataType, typename _Compare>
487 void parallel_sort(std::vector<dataType> &data,
488 _Compare comp,
489 _Compare oppositeComp,
490 std::vector<ttk::SimplexId> &dist,
491 MPI_Datatype &MPI_valueType,
492 MPI_Datatype &MPI_distanceType,
493 int nThreads) {
494 int isEmpty = (data.size() == 0);
495 MPI_Comm localComm;
496 int localRank;
497 int localSize;
498 MPI_Comm_split(ttk::MPIcomm_, isEmpty, ttk::MPIrank_, &localComm);
499 MPI_Comm_rank(localComm, &localRank);
500 MPI_Comm_size(localComm, &localSize);
501 if(isEmpty) {
502 return;
503 }
504 if(static_cast<int>(dist.size()) != localSize) {
505 dist.resize(localSize);
506 ttk::SimplexId dataSize = data.size();
507 MPI_Allgather(&dataSize, 1, MPI_distanceType, dist.data(), 1,
508 MPI_distanceType, localComm);
509 }
510
511 // Sort the data locally
512 TTK_PSORT(nThreads, data.begin(), data.end(), comp);
513
514 if(localSize == 1)
515 return;
516
517 // Find splitters
518 std::vector<std::vector<ttk::SimplexId>> right_ends(
519 localSize + 1, std::vector<ttk::SimplexId>(localSize, 0));
520 psort_split<_Compare>(data.begin(), data.end(), dist.data(), comp,
521 right_ends, MPI_valueType, MPI_distanceType, nThreads,
522 localSize, localRank, localComm);
523
524 // Communicate to destination
525 ttk::SimplexId n_loc = data.size();
526 std::vector<dataType> trans_data(n_loc);
527
528 std::vector<ttk::SimplexId> boundaries(localSize + 1);
529 alltoall(right_ends, data, trans_data, boundaries.data(), MPI_valueType,
530 MPI_distanceType, localSize, localRank, localComm);
531
532 psort_merge<_Compare>(trans_data.data(), data.data(), boundaries.data(),
533 comp, oppositeComp, localSize);
534
535 return;
536 }
537
538} // namespace p_sort
539
540#endif // TTK_ENABLE_MPI
#define TTK_FORCE_USE(x)
Force the compiler to use the function/method parameter.
Definition BaseClass.h:57
#define TTK_PSORT(NTHREADS,...)
Parallel sort macro.
Definition OpenMP.h:46
bool comp(const PersistencePair a, const PersistencePair b)
int SimplexId
Identifier type for simplices of any dimension.
Definition DataTypes.h:22
COMMON_EXPORTS int MPIrank_
Definition BaseClass.cpp:9
T end(std::pair< T, T > &p)
Definition ripser.cpp:503
T begin(std::pair< T, T > &p)
Definition ripser.cpp:499