diff --git a/include/utilities.hpp b/include/utilities.hpp index 688a512..a480d04 100644 --- a/include/utilities.hpp +++ b/include/utilities.hpp @@ -5,7 +5,7 @@ template bool is_pow_of_two(Int n){ - return (n & (n - 1)) == 0; + return n && !(n & (n - 1)); } template @@ -20,6 +20,13 @@ inline unsigned int two_log(unsigned int x){ return 8*sizeof(unsigned int) - unsigned(__builtin_clz(x-1)); } +// calculates 2^x (NOTE: can be improved by exponentiation by squaring) +inline unsigned int pow_two(unsigned int x){ + unsigned int y = 1; + while(x--) y *= 2; + return y; +} + // Makes numbers human-readable with one decimal // eg 2350000 becomes 2.3M template diff --git a/wavelet/jcmp_quantization.hpp b/wavelet/jcmp_quantization.hpp index 3f06ca2..b904edc 100644 --- a/wavelet/jcmp_quantization.hpp +++ b/wavelet/jcmp_quantization.hpp @@ -54,11 +54,11 @@ namespace jcmp { }; // non-overloaded versions - double id(double x) { return x; } - double log(double x) { return std::log(x); } - double exp(double x) { return std::exp(x); } - double sqrt(double x) { return std::sqrt(x); } - double sqr(double x) { return x*x; } + inline double id(double x) { return x; } + inline double log(double x) { return std::log(x); } + inline double exp(double x) { return std::exp(x); } + inline double sqrt(double x) { return std::sqrt(x); } + inline double sqr(double x) { return x*x; } base get(type t, double max_abs, double min_abs, bool apply = true){ base b; diff --git a/wavelet/wavelet_1.hpp b/wavelet/wavelet_1.hpp index 77a083c..9cb8d34 100644 --- a/wavelet/wavelet_1.hpp +++ b/wavelet/wavelet_1.hpp @@ -6,6 +6,11 @@ #include "periodic_iterator.hpp" #include "wavelet_constants.hpp" +/* This header is deprecated. Use wavelet_2.hpp instead. It's still here + * for checking correctness of implementations, as this one is correct, but + * very naive (and hence slow). + */ + namespace wvlt { namespace V1 { // Apply the matrix Wn with the DAUB4 coefficients diff --git a/wavelet/wavelet_2.hpp b/wavelet/wavelet_2.hpp index 0c22702..4057a95 100644 --- a/wavelet/wavelet_2.hpp +++ b/wavelet/wavelet_2.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include "wavelet_constants.hpp" diff --git a/wavelet/wavelet_parallel.hpp b/wavelet/wavelet_parallel.hpp new file mode 100644 index 0000000..f8d1c0a --- /dev/null +++ b/wavelet/wavelet_parallel.hpp @@ -0,0 +1,86 @@ +#pragma once + +#include +#include +#include +#include "wavelet.hpp" + +/* In the following function we assume any in-parameter to be already + * bsp::pushed. And the functions won't do any bsp::sync at the end. Both + * conventions make it possible to chains functions with lesser syncs. + * + * Distribution is block distribution. + */ + +namespace wvlt { + namespace par { + // Convenience container of some often-used values + // n = inputisze, p = nproc(), s = pid() + // b = blocksize, prev/next = previous and next processor index + struct distribution { + unsigned int n, p, s, b, prev, next; + + distribution(unsigned int n_, unsigned int p_, unsigned int s_) + : n(n_), p(p_), s(s_), b(n/p), prev((s-1+p)%p), next((s+1)%p) + {} + }; + + inline unsigned int communication_size(unsigned int m){ + return pow_two(m+1) - 2; + } + + inline void step(distribution const & d, double* x, double* other, unsigned int size, unsigned int stride, unsigned int m){ + unsigned int t = d.prev; + unsigned int Cm = communication_size(m); + for(unsigned int i = 0; i < Cm; ++i){ + bsp::put(t, &x[stride*i], other, i, 1); + } + bsp::sync(); + + unsigned int end = pow_two(m); + for(unsigned int i = 1; i < end; i <<= 1){ + wavelet_mul(x, other[0], other[i], size, stride*i); + if(i < end/2) wavelet_mul_base(other, 2*end - 2*i, i); + } + } + + inline void base(distribution const & d, double* x, double* other, unsigned int size, unsigned int m){ + unsigned int t = two_log(d.b); + unsigned int steps = (t-1)/m; + + unsigned int stride = 1; + for(unsigned int i = steps; i; i--){ + step(d, x, other, size, stride, m); + stride <<= m; + } + + unsigned int remaining = (t-1) - m*steps; + if(remaining) + step(d, x, other, size, stride, remaining); + } + + // block distributed parallel wavelet, result is also in block distribution (in-place in x) + inline void wavelet(distribution const & d, double* x, double* next, double* proczero, unsigned int m){ + // First do the local part + base(d, x, next, d.b, m); + + // Then do a fan in (i.e. 2 elements to proc zero) + for(unsigned int i = 0; i < 2; ++i){ + bsp::put(0, &x[i * d.b/2], proczero, d.s * 2 + i); + } + bsp::sync(); + + // proc zero has the privilige/duty to finish the job + if(d.s == 0) { + wvlt::wavelet(proczero, 2*d.p, 1); + // and to send it back to everyone + for(unsigned int t = 0; t < d.p; ++t){ + for(unsigned int i = 0; i < 2; ++i){ + bsp::put(t, &proczero[t*2 + i], x, i * d.b/2); + } + } + } + } + } +} + diff --git a/wavelet/wavelet_parallel_mockup.cpp b/wavelet/wavelet_parallel_mockup.cpp index eed79a4..f8d027e 100644 --- a/wavelet/wavelet_parallel_mockup.cpp +++ b/wavelet/wavelet_parallel_mockup.cpp @@ -1,29 +1,21 @@ #include #include +#include #include #include "wavelet.hpp" -#include "defines.hpp" +#include "wavelet_parallel.hpp" -#ifndef NEXP -// will take about 1.3 GB -#define NEXP 25 -#endif +// Number of iterations to improve time measurements +const unsigned int ITERS = 1; -const unsigned int P = 2; -const unsigned int N = 1 << NEXP; -const unsigned int ITERS = 5; +// Static :(, will be set in main +static unsigned int P; +static unsigned int N; // Static vectors for correctness checking -static std::vector par_result(N); -static std::vector seq_result(N); - -// Convenience container of some often-used values -// NOTE: we use block distribution -// n = inputisze, p = nproc(), s = pid(), b = blocksize -struct distribution { - unsigned int n, p, s, b; -}; +static std::vector par_result; +static std::vector seq_result; // fake data static double data(unsigned int global_index){ @@ -31,7 +23,7 @@ static double data(unsigned int global_index){ } // NOTE: does not synchronize -static void read_and_distribute_data(distribution const & d, double* x){ +static void read_and_distribute_data(wvlt::par::distribution const & d, double* x){ std::vector r(d.b); for(unsigned int t = 0; t < d.p; ++t){ r.assign(d.b, 0.0); @@ -42,51 +34,18 @@ static void read_and_distribute_data(distribution const & d, double* x){ } } -// NOTE: we assume x, next and proczero to be already allocated and bsp::pushed -// NOTE: no sync at the end -// block distributed parallel wavelet, result is also in block distribution (in-place in x) -static void par_wavelet_base(distribution const & d, double* x, double* next, double* proczero){ - for(unsigned int i = 1; i <= d.b/4; i <<= 1){ - // send the two elements over - auto t = (d.s - 1 + d.p) % d.p; - bsp::put(t, &x[0], next, 0); - bsp::put(t, &x[i], next, 1); - bsp::sync(); - - wvlt::wavelet_mul(x, next[0], next[1], d.b, i); - } - - // fan in (i.e. 2 elements to proc zero) - - bsp::sync(); - - // send 2 of your own elements - for(unsigned int i = 0; i < 2; ++i){ - bsp::put(0, &x[i * d.b/2], proczero, d.s * 2 + i); - } - bsp::sync(); - - // proc zero has the privilige/duty to finish the job - if(d.s == 0) { - wvlt::wavelet(proczero, 2*d.p, 1); - // and to send it back to everyone - for(unsigned int t = 0; t < d.p; ++t){ - for(unsigned int i = 0; i < 2; ++i){ - bsp::put(t, &proczero[t*2 + i], x, i * d.b/2); - } - } - } -} - static void par_wavelet(){ bsp::begin(P); - distribution d{N, bsp::nprocs(), bsp::pid(), N/P}; + wvlt::par::distribution d(N, bsp::nprocs(), bsp::pid()); + + unsigned int m = 2; + unsigned int Cm = wvlt::par::communication_size(m); // We allocate and push everything up front, since we need it anyways - // (so peak memory is the same). This saves us 1 bsp::sync + // (so peak memory is the same). This saves us 1 bsp::sync() // For convenience and consistency we use std::vector std::vector x(d.b, 0.0); - std::vector next(2, 0.0); + std::vector next(Cm, 0.0); std::vector proczero(d.s == 0 ? 2*d.p : 1, 0.0); bsp::push_reg(x.data(), x.size()); @@ -100,13 +59,12 @@ static void par_wavelet(){ if(d.s == 0) read_and_distribute_data(d, x.data()); bsp::sync(); + // do the parallel wavelet!!! double time1 = bsp::time(); - for(unsigned int i = 0; i < ITERS; ++i){ - par_wavelet_base(d, x.data(), next.data(), proczero.data()); + wvlt::par::wavelet(d, x.data(), next.data(), proczero.data(), m); bsp::sync(); } - double time2 = bsp::time(); if(d.s==0) printf("parallel version\t%f\n", time2 - time1); @@ -187,6 +145,42 @@ static void check_inverse(double threshold){ } int main(int argc, char** argv){ + namespace po = boost::program_options; + + // Describe program options + po::options_description opts; + opts.add_options() + ("p", po::value(), "number of processors") + ("n", po::value(), "number of elements") + ("help", po::value(), "show this help") + ("check", po::value(&should_check), "enables correctness checks"); + po::variables_map vm; + + // Parse and set options + try { + po::store(po::parse_command_line(argc, argv, opts), vm); + po::notify(vm); + + if(vm.count("help")){ + std::cout << "Parallel wavelet mockup" << std::endl; + std::cout << opts << std::endl; + return 0; + } + + N = vm["n"].as(); + P = vm["p"].as(); + + if(!is_pow_of_two(N)) throw po::error("n is not a power of two"); + if(!is_pow_of_two(P)) throw po::error("p is not a power of two"); + } catch(std::exception& e){ + std::cout << colors::red("ERROR: ") << e.what() << std::endl; + std::cout << opts << std::endl; + return 1; + } + + // Initialise stuff + par_result.assign(N, 0.0); + seq_result.assign(N, 0.0); bsp::init(par_wavelet, argc, argv); // Run both versions (will print timings) @@ -194,8 +188,7 @@ int main(int argc, char** argv){ par_wavelet(); // Checking equality of algorithms - bool should_check = false; - if(should_check){ + if(vm.count("check")){ double threshold = 1.0e-8; check_equality(threshold); check_inverse(threshold);