TTK
Loading...
Searching...
No Matches
psort.h
Go to the documentation of this file.
1
12
17
18/*
19Copyright (c) 2009, David Cheng, Viral B. Shah.
20
21Permission is hereby granted, free of charge, to any person obtaining a copy
22of this software and associated documentation files (the "Software"), to deal
23in the Software without restriction, including without limitation the rights
24to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
25copies of the Software, and to permit persons to whom the Software is
26furnished to do so, subject to the following conditions:
27
28The above copyright notice and this permission notice shall be included in
29all copies or substantial portions of the Software.
30
31THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
32IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
33FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
34AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
35LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
36OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
37THE SOFTWARE.
38*/
39
40#pragma once
41#include <Debug.h>
42
43#ifdef TTK_ENABLE_MPI
44namespace p_sort {
45
46 template <typename T, class _Compare>
47 class PermCompare {
48 private:
49 T *weights;
50 _Compare comp;
51
52 public:
53 PermCompare(T *w, _Compare c) : weights(w), comp(c) {
54 }
55 bool operator()(int a, int b) {
56 return comp(weights[a], weights[b]);
57 }
58 };
59
60 template <class _Compare, typename _RandomAccessIter>
61 void psort_split(_RandomAccessIter first,
62 _RandomAccessIter last,
63 ttk::SimplexId *dist,
64 _Compare comp,
65 std::vector<std::vector<ttk::SimplexId>> &right_ends,
66 MPI_Datatype &MPI_valueType,
67 MPI_Datatype &MPI_distanceType,
68 int nThreads) {
69
70 using dataType =
71 typename std::iterator_traits<_RandomAccessIter>::value_type;
72
73 int n_real = ttk::MPIsize_;
74 for(int i = 0; i < ttk::MPIsize_; ++i)
75 if(dist[i] == 0) {
76 n_real = i;
77 break;
78 }
79
80 std::copy(dist, dist + ttk::MPIsize_, right_ends[ttk::MPIsize_].begin());
81
82 // union of [0, right_end[i+1]) on each processor produces dist[i] total
83 // values
84 // ttk::SimplexId targets[ttk::MPIsize_-1];
85 std::vector<ttk::SimplexId> targets(ttk::MPIsize_ - 1);
86 std::partial_sum(dist, dist + (ttk::MPIsize_ - 1), targets.data());
87
88 // keep a list of ranges, trying to "activate" them at each branch
89 std::vector<std::pair<_RandomAccessIter, _RandomAccessIter>> d_ranges(
90 ttk::MPIsize_ - 1);
91 std::vector<std::pair<ttk::SimplexId *, ttk::SimplexId *>> t_ranges(
92 ttk::MPIsize_ - 1);
93 d_ranges[0] = std::make_pair(first, last);
94 t_ranges[0]
95 = std::make_pair(targets.data(), targets.data() + ttk::MPIsize_ - 1);
96
97 std::vector<std::vector<ttk::SimplexId>> subdist(
98 ttk::MPIsize_ - 1, std::vector<ttk::SimplexId>(ttk::MPIsize_));
99 std::copy(dist, dist + ttk::MPIsize_, subdist[0].begin());
100
101 // for each processor, d_ranges - first
102 std::vector<std::vector<ttk::SimplexId>> outleft(
103 ttk::MPIsize_ - 1, std::vector<ttk::SimplexId>(ttk::MPIsize_, 0));
104
105 for(int n_act = 1; n_act > 0;) {
106
107 for(int k = 0; k < n_act; ++k) {
108 assert(subdist[k][ttk::MPIrank_]
109 == d_ranges[k].second - d_ranges[k].first);
110 }
111
112 //------- generate n_act guesses
113
114 // for the allgather, make a flat array of ttk::MPIsize_ chunks, each with
115 // n_act elts
116 std::vector<dataType> medians(ttk::MPIsize_ * n_act);
117 for(int k = 0; k < n_act; ++k) {
118 if(d_ranges[k].first != last) {
119 dataType *ptr = &(*d_ranges[k].first);
120 ttk::SimplexId index = subdist[k][ttk::MPIrank_] / 2;
121 medians[ttk::MPIrank_ * n_act + k] = ptr[index];
122 } else
123 medians[ttk::MPIrank_ * n_act + k] = *(last - 1);
124 }
125
126 MPI_Allgather(MPI_IN_PLACE, n_act, MPI_valueType, &medians[0], n_act,
127 MPI_valueType, ttk::MPIcomm_);
128
129 // compute the weighted median of medians
130 std::vector<dataType> queries(n_act);
131
132 std::vector<ttk::SimplexId> ms_perm(n_real);
133 for(int k = 0; k < n_act; ++k) {
134
135 for(int i = 0; i < n_real; ++i)
136 ms_perm[i] = i * n_act + k;
137 TTK_PSORT(nThreads, ms_perm.data(), ms_perm.data() + n_real,
138 PermCompare<dataType, _Compare>(&medians[0], comp));
139
141 = std::accumulate(
142 subdist[k].begin(), subdist[k].end(), (ttk::SimplexId)0)
143 / 2;
144 ttk::SimplexId query_ind = -1;
145 for(int i = 0; i < n_real; ++i) {
146 if(subdist[k][ms_perm[i] / n_act] == 0)
147 continue;
148
149 mid -= subdist[k][ms_perm[i] / n_act];
150 if(mid <= 0) {
151 query_ind = ms_perm[i];
152 break;
153 }
154 }
155
156 assert(query_ind >= 0);
157 queries[k] = medians[query_ind];
158 }
159 //------- find min and max ranks of the guesses
160 std::vector<ttk::SimplexId> ind_local(2 * n_act);
161
162 for(int k = 0; k < n_act; ++k) {
163 std::pair<_RandomAccessIter, _RandomAccessIter> ind_local_p
164 = equal_range(
165 d_ranges[k].first, d_ranges[k].second, queries[k], comp);
166
167 ind_local[2 * k] = ind_local_p.first - first;
168 ind_local[2 * k + 1] = ind_local_p.second - first;
169 }
170
171 std::vector<ttk::SimplexId> ind_all(2 * n_act * ttk::MPIsize_);
172 MPI_Allgather(ind_local.data(), 2 * n_act, MPI_distanceType,
173 ind_all.data(), 2 * n_act, MPI_distanceType, ttk::MPIcomm_);
174 // sum to get the global range of indices
175 std::vector<std::pair<ttk::SimplexId, ttk::SimplexId>> ind_global(n_act);
176 for(int k = 0; k < n_act; ++k) {
177 ind_global[k] = std::make_pair(0, 0);
178 for(int i = 0; i < ttk::MPIsize_; ++i) {
179 ind_global[k].first += ind_all[2 * (i * n_act + k)];
180 ind_global[k].second += ind_all[2 * (i * n_act + k) + 1];
181 }
182 }
183
184 // state to pass on to next iteration
185 std::vector<std::pair<_RandomAccessIter, _RandomAccessIter>> d_ranges_x(
186 ttk::MPIsize_ - 1);
187 std::vector<std::pair<ttk::SimplexId *, ttk::SimplexId *>> t_ranges_x(
188 ttk::MPIsize_ - 1);
189 std::vector<std::vector<ttk::SimplexId>> subdist_x(
190 ttk::MPIsize_ - 1, std::vector<ttk::SimplexId>(ttk::MPIsize_));
191 std::vector<std::vector<ttk::SimplexId>> outleft_x(
192 ttk::MPIsize_ - 1, std::vector<ttk::SimplexId>(ttk::MPIsize_, 0));
193 int n_act_x = 0;
194
195 for(int k = 0; k < n_act; ++k) {
196 ttk::SimplexId *split_low = std::lower_bound(
197 t_ranges[k].first, t_ranges[k].second, ind_global[k].first);
198 ttk::SimplexId *split_high = std::upper_bound(
199 t_ranges[k].first, t_ranges[k].second, ind_global[k].second);
200
201 // iterate over targets we hit
202 for(ttk::SimplexId *s = split_low; s != split_high; ++s) {
203 assert(*s > 0);
204 // a bit sloppy: if more than one target in range, excess won't zero
205 // out
206 ttk::SimplexId excess = *s - ind_global[k].first;
207 // low procs to high take excess for stability
208 for(int i = 0; i < ttk::MPIsize_; ++i) {
209 ttk::SimplexId amount
210 = std::min(ind_all[2 * (i * n_act + k)] + excess,
211 ind_all[2 * (i * n_act + k) + 1]);
212 right_ends[(s - targets.data()) + 1][i] = amount;
213 excess -= amount - ind_all[2 * (i * n_act + k)];
214 }
215 }
216
217 if((split_low - t_ranges[k].first) > 0) {
218 t_ranges_x[n_act_x] = std::make_pair(t_ranges[k].first, split_low);
219 // lop off local_ind_low..end
220 d_ranges_x[n_act_x]
221 = std::make_pair(d_ranges[k].first, first + ind_local[2 * k]);
222 for(int i = 0; i < ttk::MPIsize_; ++i) {
223 subdist_x[n_act_x][i]
224 = ind_all[2 * (i * n_act + k)] - outleft[k][i];
225 outleft_x[n_act_x][i] = outleft[k][i];
226 }
227 ++n_act_x;
228 }
229
230 if((t_ranges[k].second - split_high) > 0) {
231 t_ranges_x[n_act_x] = std::make_pair(split_high, t_ranges[k].second);
232 // lop off begin..local_ind_high
233 d_ranges_x[n_act_x]
234 = std::make_pair(first + ind_local[2 * k + 1], d_ranges[k].second);
235 for(int i = 0; i < ttk::MPIsize_; ++i) {
236 subdist_x[n_act_x][i] = outleft[k][i] + subdist[k][i]
237 - ind_all[2 * (i * n_act + k) + 1];
238 outleft_x[n_act_x][i] = ind_all[2 * (i * n_act + k) + 1];
239 }
240 ++n_act_x;
241 }
242 }
243
244 t_ranges = t_ranges_x;
245 d_ranges = d_ranges_x;
246 subdist = subdist_x;
247 outleft = outleft_x;
248 n_act = n_act_x;
249 }
250 TTK_FORCE_USE(nThreads);
251 }
252
258 template <typename dataType>
259 static void alltoall(std::vector<std::vector<ttk::SimplexId>> &right_ends,
260 std::vector<dataType> &data,
261 std::vector<dataType> &trans_data,
262 ttk::SimplexId *boundaries,
263 MPI_Datatype &MPI_valueType,
264 MPI_Datatype &MPI_distanceType) {
265 // MPI_Alltoallv requires integers for displacements and counts.
266 // If there is a need to send more, then a second strategy is used
267 ttk::SimplexId n_loc_ = data.size();
268 // char and not boolean because MPI doesn't know boolean
269 char overflowInt{0};
270 ttk::SimplexId chunkSize = n_loc_ / INT_MAX + 1;
271 if(n_loc_ > INT_MAX) {
272 overflowInt = true;
273 }
274 int n_loc = static_cast<int>(n_loc_);
275 // Calculate the counts for redistributing data and detects potential
276 // overflows of INT_MAX
277 std::vector<ttk::SimplexId> send_counts(ttk::MPIsize_);
278 std::vector<ttk::SimplexId> recv_counts(ttk::MPIsize_);
279#pragma omp parallel for reduction(+ : overflowInt)
280 for(int i = 0; i < ttk::MPIsize_; ++i) {
281 ttk::SimplexId scount
282 = right_ends[i + 1][ttk::MPIrank_] - right_ends[i][ttk::MPIrank_];
283 ttk::SimplexId rcount
284 = right_ends[ttk::MPIrank_ + 1][i] - right_ends[ttk::MPIrank_][i];
285 if(scount > INT_MAX || rcount > INT_MAX) {
286 overflowInt = 1;
287 }
288 send_counts[i] = scount;
289 recv_counts[i] = rcount;
290 }
291 MPI_Allreduce(
292 MPI_IN_PLACE, &overflowInt, 1, MPI_CHAR, MPI_SUM, ttk::MPIcomm_);
293 MPI_Allreduce(
294 MPI_IN_PLACE, &chunkSize, 1, MPI_distanceType, MPI_MAX, ttk::MPIcomm_);
295 if(overflowInt == 0) {
296 std::vector<int> send_counts_int(ttk::MPIsize_);
297 std::vector<int> send_disps_int(ttk::MPIsize_);
298 std::vector<int> recv_counts_int(ttk::MPIsize_);
299 std::vector<int> recv_disps_int(ttk::MPIsize_);
300
301 for(int i = 0; i < ttk::MPIsize_; i++) {
302 send_counts_int[i] = static_cast<int>(send_counts[i]);
303 recv_counts_int[i] = static_cast<int>(recv_counts[i]);
304 }
305
306 recv_disps_int[0] = 0;
307 std::partial_sum(recv_counts_int.data(),
308 recv_counts_int.data() + ttk::MPIsize_ - 1,
309 recv_disps_int.data() + 1);
310
311 send_disps_int[0] = 0;
312 std::partial_sum(send_counts_int.data(),
313 send_counts_int.data() + ttk::MPIsize_ - 1,
314 send_disps_int.data() + 1);
315 // Do the transpose
316 MPI_Alltoallv(data.data(), send_counts_int.data(), send_disps_int.data(),
317 MPI_valueType, trans_data.data(), recv_counts_int.data(),
318 recv_disps_int.data(), MPI_valueType, ttk::MPIcomm_);
319
320 for(int i = 0; i < ttk::MPIsize_; ++i)
321 boundaries[i] = (ttk::SimplexId)recv_disps_int[i];
322 boundaries[ttk::MPIsize_] = (ttk::SimplexId)n_loc; // for the merging
323
324 return;
325 }
326
327 // if the alltoall needs to send more than INT_MAX elements,
328 // then multiple messages, of each at most INT_MAX elements,
329 // are sent until all elements are sent.
330 std::vector<ttk::SimplexId> send_disps(ttk::MPIsize_);
331 std::vector<ttk::SimplexId> recv_disps(ttk::MPIsize_);
332 send_disps[0] = 0;
333 std::partial_sum(send_counts.data(), send_counts.data() + ttk::MPIsize_ - 1,
334 send_disps.data() + 1);
335 recv_disps[0] = 0;
336 std::partial_sum(recv_counts.data(), recv_counts.data() + ttk::MPIsize_ - 1,
337 recv_disps.data() + 1);
338
339 int moreToSend{1};
340 int count{0};
341 std::vector<int> partial_recv_count(ttk::MPIsize_, 0);
342 std::vector<int> partial_send_count(ttk::MPIsize_, 0);
343 std::vector<int> partial_recv_displs(ttk::MPIsize_, 0);
344 std::vector<int> partial_send_displs(ttk::MPIsize_, 0);
345 std::vector<dataType> send_buffer_64bits(INT_MAX);
346 std::vector<dataType> recv_buffer_64bits(INT_MAX);
347 ttk::SimplexId messageSize = std::max(INT_MAX / ttk::MPIsize_ - 1, 1);
348 while(moreToSend) {
349 // Preparing the send buffer of at most INT_MAX elements
350 send_buffer_64bits.resize(0);
351 moreToSend = 0;
352 for(int i = 0; i < ttk::MPIsize_; i++) {
353 if(i > 0) {
354 partial_send_displs[i]
355 = partial_send_displs[i - 1] + partial_send_count[i - 1];
356 partial_recv_displs[i]
357 = partial_recv_displs[i - 1] + partial_recv_count[i - 1];
358 }
359 if(send_counts[i] - count * messageSize > 0) {
360 moreToSend = 1;
361 if(send_counts[i] - count * messageSize > messageSize) {
362 partial_send_count[i] = messageSize;
363 } else {
364 partial_send_count[i] = send_counts[i] - count * messageSize;
365 }
366 std::copy(data.begin() + send_disps[i] + count * messageSize,
367 data.begin() + send_disps[i] + count * messageSize
368 + partial_send_count[i],
369 send_buffer_64bits.begin() + partial_send_displs[i]);
370 } else {
371 partial_send_count[i] = 0;
372 }
373 if(recv_counts[i] - count * messageSize > 0) {
374 if(recv_counts[i] - count * messageSize > messageSize) {
375 partial_recv_count[i] = messageSize;
376 } else {
377 partial_recv_count[i] = recv_counts[i] - count * messageSize;
378 }
379 } else {
380 partial_recv_count[i] = 0;
381 }
382 }
383 MPI_Alltoallv(send_buffer_64bits.data(), partial_send_count.data(),
384 partial_send_displs.data(), MPI_valueType,
385 recv_buffer_64bits.data(), partial_recv_count.data(),
386 partial_recv_displs.data(), MPI_valueType, ttk::MPIcomm_);
387 // Receiving the buffer and placing it in the right vector
388 for(int i = 0; i < ttk::MPIsize_; i++) {
389 if(partial_recv_count[i] > 0) {
390 std::copy(recv_buffer_64bits.begin() + partial_recv_displs[i],
391 recv_buffer_64bits.begin() + partial_recv_displs[i]
392 + partial_recv_count[i],
393 trans_data.begin() + recv_disps[i] + count * messageSize);
394 }
395 }
396 count++;
397 // Test to see if more messages need to be sent
398 MPI_Allreduce(
399 MPI_IN_PLACE, &moreToSend, 1, MPI_INTEGER, MPI_SUM, ttk::MPIcomm_);
400 }
401
402 for(int i = 0; i < ttk::MPIsize_; ++i)
403 boundaries[i] = recv_disps[i];
404 boundaries[ttk::MPIsize_] = n_loc_; // for the merging
405
406 return;
407 }
408
409 template <class _Compare, typename _RandomAccessIter>
410 void psort_merge(_RandomAccessIter in,
411 _RandomAccessIter out,
412 ttk::SimplexId *disps,
413 _Compare comp,
414 _Compare oppositeComp) {
415
416 if(ttk::MPIsize_ == 1) {
417 std::copy(in, in + disps[ttk::MPIsize_], out);
418 return;
419 }
420
421 _RandomAccessIter bufs[2] = {in, out};
422
423 std::vector<ttk::SimplexId> locs(ttk::MPIsize_, 0);
424
425 ttk::SimplexId next = 1;
426 while(true) {
427 ttk::SimplexId stride = next * 2;
428 if(stride >= ttk::MPIsize_)
429 break;
430
431 for(ttk::SimplexId i = 0; i + next < ttk::MPIsize_; i += stride) {
432 ttk::SimplexId end_ind
433 = std::min(i + stride, (ttk::SimplexId)ttk::MPIsize_);
434
435 std::merge(bufs[locs[i]] + disps[i], bufs[locs[i]] + disps[i + next],
436 bufs[locs[i + next]] + disps[i + next],
437 bufs[locs[i + next]] + disps[end_ind],
438 bufs[1 - locs[i]] + disps[i], comp);
439 locs[i] = 1 - locs[i];
440 }
441
442 next = stride;
443 }
444
445 // now have 4 cases for final merge
446 if(locs[0] == 0) {
447 // 00, 01 => out of place
448 std::merge(in, in + disps[next], bufs[locs[next]] + disps[next],
449 bufs[locs[next]] + disps[ttk::MPIsize_], out, comp);
450 } else if(locs[next] == 0) {
451 // 10 => backwards out of place
452 std::merge(
453 std::reverse_iterator<_RandomAccessIter>(in + disps[ttk::MPIsize_]),
454 std::reverse_iterator<_RandomAccessIter>(in + disps[next]),
455 std::reverse_iterator<_RandomAccessIter>(out + disps[next]),
456 std::reverse_iterator<_RandomAccessIter>(out),
457 std::reverse_iterator<_RandomAccessIter>(out + disps[ttk::MPIsize_]),
458 oppositeComp);
459 } else {
460 // 11 => in-place
461 std::inplace_merge(
462 out, out + disps[next], out + disps[ttk::MPIsize_], comp);
463 }
464 }
465
482 template <typename dataType, typename _Compare>
483 void parallel_sort(std::vector<dataType> &data,
484 _Compare comp,
485 _Compare oppositeComp,
486 std::vector<ttk::SimplexId> &dist,
487 MPI_Datatype &MPI_valueType,
488 MPI_Datatype &MPI_distanceType,
489 int nThreads) {
490
491 // Sort the data locally
492 TTK_PSORT(nThreads, data.begin(), data.end(), comp);
493
494 if(ttk::MPIsize_ == 1)
495 return;
496
497 // Find splitters
498 std::vector<std::vector<ttk::SimplexId>> right_ends(
499 ttk::MPIsize_ + 1, std::vector<ttk::SimplexId>(ttk::MPIsize_, 0));
500 psort_split<_Compare>(data.begin(), data.end(), dist.data(), comp,
501 right_ends, MPI_valueType, MPI_distanceType,
502 nThreads);
503
504 // Communicate to destination
505 ttk::SimplexId n_loc = data.size();
506 std::vector<dataType> trans_data(n_loc);
507
508 std::vector<ttk::SimplexId> boundaries(ttk::MPIsize_ + 1);
509 alltoall(right_ends, data, trans_data, boundaries.data(), MPI_valueType,
510 MPI_distanceType);
511
512 psort_merge<_Compare>(
513 trans_data.data(), data.data(), boundaries.data(), comp, oppositeComp);
514
515 return;
516 }
517
518} // namespace p_sort
519
520#endif // TTK_ENABLE_MPI
#define TTK_FORCE_USE(x)
Force the compiler to use the function/method parameter.
Definition BaseClass.h:57
#define TTK_PSORT(NTHREADS,...)
Parallel sort macro.
Definition OpenMP.h:46
COMMON_EXPORTS int MPIsize_
Definition BaseClass.cpp:10
COMMON_EXPORTS int MPIrank_
Definition BaseClass.cpp:9
int SimplexId
Identifier type for simplices of any dimension.
Definition DataTypes.h:22
T end(std::pair< T, T > &p)
Definition ripserpy.cpp:472
T begin(std::pair< T, T > &p)
Definition ripserpy.cpp:468