Для C++ существует API позволяющее очень красиво распараллелить вычисления на многоядерную архитектуру процессора - OpenMP.

Это не единственная модель вычислений, часто удобно просто явным образом запустить отдельный поток вычислений, или явно работать с защелкой (lock), или использовать другие примитивы. Но сегодня мы обсудим один из самых популярных и часто используемых вариантов в случае когда речь идет о C++ и обработке картинок - распараллеливание обработки в цикле.

P.S. не забудьте компилировать это задание в Release.

Распараллеливание цикла

Представим что есть цикл который увеличивает каждый элемент вектора в два раза:

std::vector<int> xs = ...;

for (int i = 0; i < xs.size(); ++i) {
    xs[i] *= 2;
}

Вот такое указание скажет компилятору что мы хотим выполнить цикл в несколько потоков чтобы использовать все ядра (и виртуальные потоки) процессора.

#pragma omp parallel for // omp = OpenMP, parallel = запустить потоки, for = распределить по ним индексы этого цикла
for (int i = 0; i < xs.size(); ++i) {
    xs[i] *= 2;
}

Иначе говоря каждый индекс i для которого этот цикл должен быть выполнен - достанется для выполнения одному из потоков.

Критическая секция

Что если мы хотим суммировать все числа массива? В чем проблема такого, казалось бы, естественного решения?

long long totalSum = 0;
#pragma omp parallel for // parallel = запустить потоки, for = распределить по ним индексы этого цикла
for (int i = 0; i < xs.size(); ++i) {
    totalSum += xs[i];
}

В том что здесь происходит состояние гонки при считывании и записи текущего значения totalSum. Ведь мы одновременно делает это из разных потоков.

Как это можно исправить? Правильно, воспользовавшись lock-ом (защелкой, по сути замком на двери уборной), который будет гарантировать нам что лишь один из потоков сейчас работает с переменной totalSum.

long long totalSum = 0; // используем long long (64-битное число) чтобы при суммировании int - не переполнился результат
#pragma omp parallel for
for (int i = 0; i < xs.size(); ++i) {
    #pragma omp critical // omp = OpenMP, critical = это критическая секция кода, в нее потоки заходят по одному
    {
        totalSum += xs[i];
    }
}

Какова будет производительность (скорость работы) такого кода? Почему?

Как бы это сделать лучше?

Reduction

Очень часто хочется делать подобные операции как в прошлой функции - объединение результата между разными потоками.

Т.е. по сути хочется применить некую операцию между результатами работы всех потоков, такое преобразование называется редукция (reduction).

Причем операции бывают разные - например суммирование (или перемножение, взятие максимума, взятие минимума).

Если мы укажем что потоки должны провести редукцию над totalSum, то каждый поток сделает свою личную копию этой переменной, будет суммировать свои элементы в эту копию (по сути накапливая свою частичную сумму), и когда весь цикл будет обработан - все потоки соберутся, покажут друг другу накопленное значение этой переменной, и объединив результаты - запишут в оригинальную переменную totalSum общий результат.

long long totalSum = 0;
#pragma omp parallel for reduction(+: totalSum) // в скобочках идут параметры редукции - сначала идет операция, затем - название переменной-аккумулятора
for (int i = 0; i < xs.size(); ++i) {
    totalSum += xs[i];
}

Насколько быстро работает такой код относительно однопоточной версии и версии с critical секцией? Почему?

Какие еще операции ложаться на эту парадигму? Какое свойство от них требуется? Ложиться ли например на эту идею операция вычитания? А взятие наибольшего общего делителя?

Самописный Reduction

Что нужно чтобы суметь написать редукцию самостоятельно (просто из любопытства и чтобы научиться писать более сложные вещи)?

1) Запустить каждый поток

2) В каждом потоке завести аккумулятор для частичной суммы (его личная копилка)

3) Пройтись по циклу и в каждом потоке добавлять элементы в свою переменную

4) Просуммировать все частичные суммы в общую переменную

long long totalSum = 0;
int threadsN = 0;
#pragma omp parallel // ЗДЕСЬ НЕТ for, но есть parallel = говорим что эту секцию хочется запустить для каждого потока процессора
{
    int threadId = -1;
    #pragma omp critical // в критической секции рассчитаемся по номерам потоков и выведем в консоль что такой-то поток был запущен
    {    
        threadId = threadsN;
        std::cout << "Thread #" << threadId << " started..." << std::endl;
        ++threadsN;
    }
    long long threadSum = 0;
    #pragma omp for // ЗДЕСЬ НЕТ parallel, т.к. это ключевое слово говорит "запускай потоки", но потоки уже запущены,
    for (int i = 0; i < xs.size(); ++i) { // осталось лишь распределить среди них вычислительную рабочую нагрузку
        threadSum += xs[i]; // почему здесь не нужна критическая секция?
    }
    #pragma omp critical
    {
        totalSum += threadSum;
        std::cout << "Thread #" << threadId << " finished!" << std::endl;
    }
}

А зачем здесь первая критическая секция? Нельзя ли ее сделать без синхронизации? Попробуйте.

А зачем здесь вторая критическая секция? Нельзя ли ее сделать без синхронизации? Попробуйте.

Как распределяются вычисления?

Как выяснить какие индексы цикла достаются потоку при вычислении? Попробуйте придумать эксперимент который позволит это выяснить опытным путем.