Коллективные коммуникации
В коллективных коммуникациях участвуют все процессы коммуникатора. Соответствующие функции должны быть вызваны в каждом из процессов. MPI гарантирует корректную совместную работу коллективных коммуникаций и коммуникаций "точка-точка". В MPI содержатся функции следующих типов:
Барьерная синхронизация процессов коммуникатора
Рассылка одних и тех же данных от одного процесса всем (broadcast)
Сбор данных одним процессом от остальных (gather)
Разделение данных одного процесса между всеми процессами (scatter)
Вариация сбора данных, при которой все процессы получают результат (allgather)
Обмен данными "каждый с каждым" (alltoall)
Редукционные операции
Комбинированные операции редукции и разделения
Частичные редукционные операции
Функция MPI_Barrier.
int MPI_Barrier( MPI_Comm comm )
Входной параметр:
comm - коммуникатор
После вызова MPI_Barrier вызвавший процесс блокируется до тех пор, пока все процессы коммуникатора не вызовут эту функцию.
Функция MPI_Bcast.
int MPI_Bcast( void *buffer, int count, MPI_Datatype datatype, int root,
MPI_Comm comm )
INPUT/OUTPUT PARAMETER
buffer - Адрес.
INPUT PARAMETERS
count - Число элементов в буфере
datatype
- тип данных
root - ранг корневого процесса
comm - коммуникатор
Данные, находяшиеся по адресу buffer в процессе с рангом root копируются всем остальным процессам по адресу buffer. (Рисунок 1)
Функция MPI_Gather.
int MPI_Gather ( void *sendbuf, int sendcnt, MPI_Datatype sendtype,
void *recvbuf, int recvcount, MPI_Datatype recvtype,
int root, MPI_Comm comm )
INPUT PARAMETERS
sendbuf
- Адрес, по которому находятся отправляемые данные
sendcnt
- Число отправляемых данных
sendtype
- Тип отправляемых данных
recvcount
- Число принимаемых данных
recvtype
- Тип принимаемых данных (имеет значение только в процессе с рангом root)
root - ранг процесса, который принимает данные.
comm - communicator (handle)
OUTPUT PARAMETER
recvbuf
- Адрес, по которому принимаются данные. (имеет значение только в процессе с рангом root)
Данные из буферов sendbuf в количестве sendcount всех процессов в порядке возрастания рангов копируются в буфер recvbuf процесса с рангом root
Может возникнуть ситуация, когда необходимо собрать в один массив массивы с разным числом элементов. Для этого используется следующая функция
Функция MPI_Gatherv.
int MPI_Gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf, int *recvcnts, int *displs,
MPI_Datatype recvtype, int root, MPI_Comm comm)
INPUT PARAMETERS
sendbuf
- Адрес, по которому находятся отправляемые данные
sendcount
- Число отправляемых данных (в каждом процессе — свое)
sendtype
- Тип отправляемых данных
recvcounts
- Целочисленный массив размера, равного числу процессов в коммуникаторе;
каждый элемент этого массива содержит число элементов, принимаемых от процесса с рангом,
равным номеру этого элемента.
displs - Целочисленный массив размера, равного числу процессов в коммуникаторе.
i-тый элемент определяет смещение (в элементах) относительно адреса recvbuf,
по которому разместить данные, приходящие от процесса с рангом i.
(имеет значение только в процессе с рангом root)
recvtype
- Тип принимаемых данных (имеет значение только в процессе с рангом root)
root - ранг процесса, который принимает данные.
comm - communicator (handle)
OUTPUT PARAMETER
recvbuf
- Адрес, по которому принимаются данные. (имеет значение только в процессе с рангом root)
Функция MPI_Scatter.
int MPI_Scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf, int recvcnt, MPI_Datatype recvtype, int root,
MPI_Comm comm)
INPUT PARAMETERS
sendbuf
- address of send buffer (choice, significant only at root )
sendcount
- number of elements sent to each process (integer, significant only at root )
sendtype
- data type of send buffer elements (significant only at root ) (handle)
recvcount
- number of elements in receive buffer (integer)
recvtype
- data type of receive buffer elements (handle)
root - rank of sending process (integer)
comm - communicator (handle)
OUTPUT PARAMETER
recvbuf
- address of receive buffer (choice)
Функция MPI_Scatterv.
int MPI_Scatterv( void *sendbuf, int *sendcounts, int *displs,
MPI_Datatype sendtype, void *recvbuf, int recvcnt,
MPI_Datatype recvtype,
int root, MPI_Comm comm)
INPUT PARAMETERS
sendbuf
- address of send buffer (choice, significant only at root )
sendcounts
- integer array (of length group size) specifying the number of elements to send to each processor
displs - integer array (of length group size). Entry i specifies the displacement
(relative to sendbuf from which to take the outgoing data to process i
sendtype
- data type of send buffer elements (handle)
recvcount
- number of elements in receive buffer (integer)
recvtype
- data type of receive buffer elements (handle)
root - rank of sending process (integer)
comm - communicator (handle)
OUTPUT PARAMETER
recvbuf
- address of receive buffer (choice)
Функции MPI_Scatter и MPI_Scatterv выполняют действия, обратные функциям MPI_Gather и MPI_Gatherv, а именно, рассылают данные, находящиеся по адресу sendbuf процесса с рангом root, всем процессам коммуникатора.
Пример использования функции MPI_Scatter
#include<mpi.h>
#include<stdio.h>
#include <stdlib.h>
int main(int argc, char** argv)
{
int * data, local;
int rank,size;
int i;
MPI_Init(&argc,&argv);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
MPI_Comm_size(MPI_COMM_WORLD,&size);
if(!rank)
{
data=(int*) malloc (sizeof(int)*size);
for(i=0;i<size;i++)
data[i]=10*i;
}
else
data=(int*)0;
MPI_Scatter(data, 1, MPI_INT,&local, 1, MPI_INT, 0, MPI_COMM_WORLD);
printf("Rank %d local: %d\n",rank,local);
MPI_Finalize();
return 0;
}
Функция MPI_Allgather
int MPI_Allgather ( void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf, int recvcount, MPI_Datatype recvtype,
MPI_Comm comm )
INPUT PARAMETERS
sendbuf
- Адрес, по которому находятся отправляемые данные
sendcount
- Число элементов в отправляемом буфере
sendtype
- Тип отправляемых данных
recvcount
- Число элементов в принимаемом буфере
recvtype
- Тип принимаемых данных
comm - коммуникатор
OUTPUT PARAMETER
recvbuf
- Адрес, с которого начнется размещение принятых данных.
Отличие этой функции от MPI_Gather в том, что итоговый массив получают все процессы коммуникатора (рис 3.1)
Функция MPI_Alltoall.
int MPI_Alltoall(void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf, int recvcount, MPI_Datatype recvtype,
MPI_Comm comm)
INPUT PARAMETERS
sendbuf
- starting address of send buffer (choice)
sendcount
- number of elements to send to each process (integer)
sendtype
- data type of send buffer elements (handle)
recvcount
- number of elements received from any process (integer)
recvtype
- data type of receive buffer elements (handle)
comm - communicator (handle)
OUTPUT PARAMETER
recvbuf
- address of receive buffer (choice)
Действие этой функции показано на рисунке 3.1
Редукционные операции
Иногда бывает необходимо вычислить значение какой-то величины на основании частичных ее значений, вычисленных в нескольких процессах программы. Например, каждый процесс может вычислить частичную сумму, и на основании этих значений необходимо вычислить полную сумму. Или необходимо выбрать минимум из локальных минимумов и т. д.
В MPI есть следующие встроенные редукционные операции:
MPI_MAX - максимум
MPI_MIN - минимум
MPI_SUM - сумма
MPI_PROD - произведение
MPI_LAND - логическое «и»
MPI_BAND - побитовое «и»
MPI_LOR - логическое «или»
MPI_BOR - побитовое «или»
MPI_LXOR - логическое «исключающеее или»
MPI_BXOR - побитовое «исключающеее или»
MPI_MAXLOC - максимум и его расположение
MPI_MINLOC - минимум и его расположение
Кроме того, есть возможность объявлять пользовательские редукционные операции.
Функция MPI_Reduce.
int MPI_Reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype,
MPI_Op op, int root, MPI_Comm comm)
INPUT PARAMETERS
sendbuf
- Адрес, по которому находятся отправляемые данные
count - число элементов в sendbuf, к которым требуется применить операцию редукции
datatype
- Тип данных
op - Редукционная операция
root - ранг процесса, который получит итоговое значение (корневого процесса)
comm - Коммуникатор
OUTPUT PARAMETER
recvbuf
- Адрес, по которому будет записано итоговое значение
(параметр имеет значение только в процессе с рангом root)
Функция MPI_Allreduce.
int MPI_Allreduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype,
MPI_Op op, MPI_Comm comm)
INPUT PARAMETERS
sendbuf
- Адрес, по которому находятся отправляемые данные
count - число элементов в sendbuf, к которым требуется применить операцию редукции
datatype
- Тип данных
op - Редукционная операция
comm - Коммуникатор
OUTPUT PARAMETER
recvbuf
- Адрес, по которому будет записано итоговое значение.
Делает то же самое, что и MPI_Reduce за исключением того, что итоговое значение копируется всем процессам коммуникатора.
Пример: распределенный поиск минимума
#include<mpi.h>
#include<stdio.h>
#include<stdlib.h>
int main(int argc, char** argv)
{
int rank,size,i,j;
int length, locallength;
int *displs,*counts;
float* data;
float * local;
float min,localmin;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD,&size);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
if(argc!=2)
{
if(!rank)
fprintf(stderr,"usage: min filename\n");
MPI_Finalize();
exit(1);
}
if(!rank)
{
freopen(argv[1],"r",stdin);
scanf("%d",&length);
data=(float*)malloc(sizeof(float)* length);
for(i=0;i<length;i++)
scanf("%f",data+i);
}
else
{
data=(float*) 0;
}
MPI_Bcast(&length, 1, MPI_INT, 0,MPI_COMM_WORLD);
locallength=length/size+(rank<(length%size));
local=(float*)malloc(sizeof(float)*locallength);
displs=(int*)malloc(sizeof(int)*size);
counts=(int*)malloc(sizeof(int)*size);
for(i=0;i<size;i++)
{
counts[i]=length/size+(i<(length%size));
displs[i]=0;
for(j=0;j<i;j++)
displs[i]+=counts[j];
}
MPI_Scatterv(data,counts, displs,MPI_FLOAT,local,locallength,MPI_FLOAT,0,MPI_COMM_WORLD);
localmin=local[0];
for(i=0;i<locallength;i++)
if(local[i]<localmin)
localmin=local[i];
MPI_Reduce ( &localmin, &min, 1,MPI_FLOAT, MPI_MIN,0, MPI_COMM_WORLD);
if(!rank)
printf("min=%f\n",min);
free(local);
free(data);
free(displs);
free(counts);
MPI_Finalize();
return 0;
}
Задание: Поиск k-той порядковой статистики
Определение: k-той порядковой статистикой x(k) массива x называется к-тый по неубыванию элемент массива x.
Определение: медианой массива x называется x(N/2+1), где N - длина массива.
Обозначение: |x|— число элементов массива x.
Описание последовательного алгоритма: Пусть задан массив S из n элементов и число k. Необходимо найти S(k). Применяется рекурсивная процедура select
val_type select(int k, val_type* S)
{
if(|S|<50)
{
sort(S);
return S[k]
}
else
{
Разбить S на |S|/5 последовательностей по 5 элементов в каждой (останутся не более 4х элементов);
Отсортировать каждую такую последовательность;
Пусть M — последовательность медиан этих пятиэлементных множеств
m=медиана M;
Пусть S1, S2 и S3 — последовательности элементов из S, соответственно меньших, равных и больших m;
if(|S1|>k)
return select(k,S1);
else
if(|S1|+|S2|>k)
return m;
else
return select(k-|S1|-|S2|,S3);
}
}
Описание параллельного алгоритма: Пусть задан массив S из length элементов и число k. Необходимо найти S(k). Применяются рекурсивные функции selectparallel и select
int selectparallel(int k, int* s, int size)
{
Значения k и size известны только в процессе с рангом 0,
поэтому необходимо их разослать
if(size<50) //Ничего не распараллеливаем, считает только 1 процесс
// и рассылает результат всем
{
if(!rank)
{
sort(s,size);
result=s[k];
}
MPI_Bcast(&result,1,MPI_INT,0,MPI_COMM_WORLD);
return result;
}
else
{
0й процесс разрезает массив S между процессами.
Каждый принимает свою часть в массив data
Каждый процесс заполняет массив medians медианами пятерок массива data,
точно так же, как и в select
каждый процесс находит m - медиану medians, с использованием последовательной select
Вызывается функция MPI_Allgather, после чего в каждом процессе есть mm — массив локальных
медиан всех процессов.
m = медиана mm
В каждом процессе заполняются массивы s1,s2,s3 элементами массива data
меньшими, равными, большими m соответственно.Пусть с1,с2, с3 —
число элементов в s1,s2,s3 соответственно.
Вызывается функция MPI_Allreduce и каждый процесс получает
в переменные tc1,tc2,tc3 суммы c1, c2, c3 всех процессов соответственно
if (tc1>k)
{
В массив counts процесса 0 собираются значения с1 всех процессов
Вызывается MPI_Gatherv для сбора элементов из s1 всех процессов по адресу s
процесса 0.
Освобождается ненужная далее память
return selectparallel(k,s,tc1);
}
else if (tc1+tc2>k)
{
освобождается память
return m;
}
else
{
В массив counts процесса 0 собираются значения с3 всех процессов
Вызывается MPI_Gatherv для сбора элементов из s3 всех процессов по адресу s
процесса 0.
Освобождается ненужная далее память
return selectparallel(k-tc1-tc2,s,tc3);
}
}
}
Задание 4
Написать параллельную программу поиска k-той порядковой статистики в массиве за линейное время. Массив считывается из файла, имя которого передается исполняемому файлу в командной строке. Формат файла: в первой строке число элементов массива и номер статистики, далее идут сами элементы через пробел. Тип элементов int.