Parallel sorting#
Parallel sort algorithm, MPI version.
#include <stddef.h>
#include <stdlib.h>
#include <stdio.h>
#include <time.h>
#include "mpi.h"
#include <unistd.h>
typedef struct vector {
double *data;
size_t N;
} vector;
void fill_random(vector v) {
for (size_t i = 0; i < v.N; ++i)
v.data[i] = (double)rand() / (RAND_MAX + 1.);
}
static int cmp_double(const void *p1_, const void *p2_) {
const double *const p1 = p1_;
const double *const p2 = p2_;
return (*p1 == *p2) ? 0 : (*p1 < *p2 ? -1 : 1);
}
double *partition(double *first, double *last, double pivot) {
for (; first != last; ++first)
if (!((*first) < pivot))
break;
if (first == last)
return first;
for (double *i = first + 1; i != last; ++i) {
if ((*i) < pivot) {
double temp = *i;
*i = *first;
*first = temp;
++first;
}
}
return first;
}
vector parallel_sort(vector v) {
int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
double *local_pivots = malloc(size * sizeof(*local_pivots));
if (local_pivots == NULL)
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
double *pivots = malloc(size * (size + 1) * sizeof(*pivots));
if (pivots == NULL)
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
for (int i = 0; i < size - 1; ++i)
local_pivots[i] = v.data[(size_t)(v.N * (double)rand() / (RAND_MAX + 1.))];
MPI_Allgather(local_pivots, size - 1, MPI_DOUBLE, pivots, size - 1, MPI_DOUBLE,
MPI_COMM_WORLD);
qsort(pivots, size * (size - 1), sizeof(double), cmp_double);
for (size_t i = 1; i < size; ++i)
local_pivots[i - 1] = pivots[i * (size - 1)];
double **pivot_pos = malloc((size + 1) * sizeof(*pivot_pos));
if (pivot_pos == NULL)
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
pivot_pos[0] = v.data;
for (size_t i = 0; i < size - 1; ++i)
pivot_pos[i + 1] = partition(pivot_pos[i], v.data + v.N, local_pivots[i]);
pivot_pos[size] = v.data + v.N;
int *local_block_sizes = malloc(size * sizeof(*local_block_sizes));
if (local_block_sizes == NULL)
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
int *block_sizes = malloc(size * size * sizeof(*block_sizes));
if (block_sizes == NULL)
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
for (size_t i = 0; i < size; ++i)
local_block_sizes[i] = pivot_pos[i + 1] - pivot_pos[i];
MPI_Allgather(local_block_sizes, size, MPI_INT, block_sizes, size, MPI_INT, MPI_COMM_WORLD);
int send_pos = 0, recv_pos = 0;
int sendcounts[size], sdispls[size], recvcounts[size], rdispls[size];
for (size_t i = 0; i < size; ++i) {
sendcounts[i] = block_sizes[rank * size + i];
sdispls[i] = send_pos;
send_pos += block_sizes[rank * size + i];
recvcounts[i] = block_sizes[rank + size * i];
rdispls[i] = recv_pos;
recv_pos += block_sizes[rank + size * i];
}
double *v2 = malloc(recv_pos * sizeof(*v2));
if (v2 == NULL)
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
MPI_Alltoallv(v.data, sendcounts, sdispls, MPI_DOUBLE, v2, recvcounts, rdispls, MPI_DOUBLE,
MPI_COMM_WORLD);
qsort(v2, recv_pos, sizeof(double), cmp_double);
free(block_sizes);
free(local_block_sizes);
free(pivot_pos);
free(pivots);
free(local_pivots);
return (vector){v2, recv_pos};
}
int main(int argc, char *argv[]) {
MPI_Init(&argc, &argv);
int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
srand(time(NULL) * rank);
const size_t N = 100000000 / size;
double *v = malloc(N * sizeof(*v));
if (v == NULL)
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
fill_random((vector){v, N});
vector sorted = parallel_sort((vector){v, N});
free(sorted.data);
free(v);
MPI_Finalize();
return EXIT_SUCCESS;
}
Parallel sort algorithm, MPL version.
#include <cstdlib>
#include <random>
#include <vector>
#include <algorithm>
#include <mpl/mpl.hpp>
static std::random_device rd;
static std::mt19937_64 mt(rd());
double get_random() {
std::uniform_real_distribution<double> dist(0, 1);
return dist(mt);
}
void fill_random(std::vector<double> &v) {
std::generate(std::begin(v), std::end(v), get_random);
}
// parallel sort algorithm for distributed memory computers
//
// algorithm works as follows:
// 1) each process draws (size-1) random samples from its local data
// 2) all processes gather local random samples => size*(size-1) samples
// 3) size*(size-1) samples are sorted locally
// 4) pick (size-1) pivot elements from the globally sorted sample
// 5) partition local data with respect to the pivot elements into size bins
// 6) redistribute data such that data in bin i goes to process with rank i
// 7) sort redistributed data locally
//
// Note that the amount of data at each process changes during the algorithm.
// In worst case, a single process may hold finally all data.
//
template<typename T>
void parallel_sort(std::vector<T> &v) {
auto &comm_world{mpl::environment::comm_world()};
const int rank{comm_world.rank()};
const int size{comm_world.size()};
std::vector<T> local_pivots, pivots(size * (size - 1));
std::sample(begin(v), end(v), std::back_inserter(local_pivots), size - 1, mt);
comm_world.allgather(local_pivots.data(), mpl::vector_layout<T>(size - 1), pivots.data(),
mpl::vector_layout<T>(size - 1));
std::sort(begin(pivots), end(pivots));
local_pivots.clear();
for (std::size_t i{1}; i < static_cast<std::size_t>(size); ++i)
local_pivots.push_back(pivots[i * (size - 1)]);
swap(local_pivots, pivots);
std::vector<typename std::vector<T>::iterator> pivot_pos;
pivot_pos.push_back(begin(v));
for (T p : pivots)
pivot_pos.push_back(std::partition(pivot_pos.back(), end(v), [p](T x) { return x < p; }));
pivot_pos.push_back(end(v));
std::vector<int> local_block_sizes, block_sizes(size * size);
for (std::size_t i{0}; i < pivot_pos.size() - 1; ++i)
local_block_sizes.push_back(
static_cast<int>(std::distance(pivot_pos[i], pivot_pos[i + 1])));
comm_world.allgather(local_block_sizes.data(), mpl::vector_layout<int>(size),
block_sizes.data(), mpl::vector_layout<int>(size));
mpl::layouts<T> send_layouts, recv_layouts;
int send_pos{0}, recv_pos{0};
for (int i{0}; i < size; ++i) {
send_layouts.push_back(mpl::indexed_layout<T>({{block_sizes[rank * size + i], send_pos}}));
send_pos += block_sizes[rank * size + i];
recv_layouts.push_back(mpl::indexed_layout<T>({{block_sizes[rank + size * i], recv_pos}}));
recv_pos += block_sizes[rank + size * i];
}
std::vector<T> v_2(recv_pos);
comm_world.alltoallv(v.data(), send_layouts, v_2.data(), recv_layouts);
std::sort(begin(v_2), end(v_2));
swap(v, v_2);
}
int main() {
const auto &comm_world{mpl::environment::comm_world()};
const int size{comm_world.size()};
const std::size_t N{100000000 / static_cast<std::size_t>(size)};
std::vector<double> v(N);
fill_random(v);
parallel_sort(v);
return EXIT_SUCCESS;
}