Post

MPI

A simple MPI program:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <mpi.h>

int main(int argc, char **argv) {

    int myrank, sbuf=23, rbuf=32;

    MPI_Init(&argc, &argv);

    /* Find out my identity in the default communicator */
    MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
    if (myrank == 0) {
        MPI_Send(&sbuf,   /* message buffer */
        1,                /* one data item */
        MPI_INT,          /* data item is an integer */
        rank,             /* destination process rank */
        99,               /* user chosen message tag */
        MPI_COMM_WORLD);  /* default communicator */

    } else {
        MPI_Recv(&rbuf, MPI_DOUBLE, 0, 99, MPI_COMM_WORLD, &status);
        printf("received: %i\n", rbuf);
    }

    MPI_Finalize();
}

MPI supports Shared Memory and PGAS-style

PGAS: Partitioned Global Address Space

  1. Support for shared memory in SMM domains through Shared memory windows
  2. Support for Remote Memory Access Programming: Direct use of RDMA through One Sided
  3. Enhanced support for message passing communication:
    • Scalable topologies
    • More non-blocking features

The Message-Passing Model

  • A process is (traditionally) a program counter and address space (with resources)
  • Processes may have multiple threads (program counters and associated stacks) sharing a single address space.

MPI is for communication among processes, which have separate address spaces.

Inter-process communication consists of

  1. synchronization
  2. movement of data from one process’s address space to another’s.

shutup

Reasons for Using MPI

  1. Standardization - MPI is the only message passing library which can be considered a standard. It is supported on virtually all HPC platforms. Practically, it has replaced all previous message passing libraries
  2. Portability - There is no need to modify your source code when you port your application to a different platform that supports (and is compliant with) the MPI standard
  3. Performance Opportunities - Vendor implementations should be able to exploit native hardware features to optimize performance
  4. Functionality - Rich set of features
  5. Availability - A variety of implementations are available, both vendor and public domain
    • MPICH/Open MPI are popular open-source and free implementations of MPI
    • Vendors and other collaborators take MPICH and add support for their systems

Process Identification

MPI processes can be collected into groups.

  • Each group can have multiple colors (sometimes called context)
  • group + color == communicator (like a name for the group)
  • When an MPI application starts, the group of all processes is initially given a predefined name called MPI_COMM_WORLD
  • The same group can have many names, but simple programs do not have to worry about multiple names

A process is identified by a unique number within each communicator, called rank.

  • For two different communicators, the same process can have two different ranks:
    • the meaning of a “rank” is only defined when you specify the communicator

shutup

Data Communication

Data communication in MPI is like email exchange:

  • one process sends a copy of the data to another process (or a group of processes),
  • and the other process receives it.

Send operations

Communication requires the following information

Sender has to know:

  • Whom to send the data to (receiver’s process rank)
  • What kind of data to send (100 integers or 200 characters, etc)
  • A user-defined “tag” for the message
    • think of it as an email subject
    • it allows the receiver to understand what type of data is being received

Receiver “might” have to know:

  • Who is sending the data
    • OK if the receiver does not know; in this case sender rank will be MPI_ANY_SOURCE, meaning anyone can send
  • What kind of data is being received
    • partial information is OK: I might receive up to 1000 integers
  • What the user-defined “tag” of the message is
    • OK if the receiver does not know; in this case tag will be MPI_ANY_TAG

When sending data, the sender has to specify the destination process’ rank.

The receiver has to specify the source process’ rank.

MPI_ANY_SOURCE is a special “wild-card” source that can be used by the receiver to match any source.

More Details on User “Tags” for Communication

Messages are sent with an accompanying user-defined integer tag, to assist the receiving process in identifying the message. For example, if an application is expecting two types of messages from a peer, tags can help distinguish these two types.

MPI_ANY_TAG is a special “wild-card” tag that can be used by the receiver to match any tag.

Non-Blocking Communication

  • Non-blocking (asynchronous) operations return (immediately) “request handles” that can be waited on and queried
    • MPI_ISEND(start, count, datatype, dest, tag, comm, request)
    • MPI_IRECV(start, count, datatype, src, tag, comm, request)
    • MPI_WAIT(request, status)
  • Non-blocking operations allow overlapping computation and communication
  • One can also test without waiting using MPI_TEST
    • MPI_TEST(request, flag, status)
  • Anywhere you use MPI_SEND or MPI_RECV, you can use the pair of MPI_ISEND/MPI_WAIT or MPI_IRECV/MPI_WAIT
  • Combinations of blocking and non-blocking sends/receives can be used to synchronize execution instead of barriers

Multiple Completions

It is sometimes desirable to wait on multiple requests:

  • MPI_Waitall(count, array_of_requests, array_of_statuses)
  • MPI_Waitany(count, array_of_requests, &index, &status)
  • MPI_Waitsome(count, array_of_requests, array_of_indices, array_of_statuses)

There are corresponding versions of test for each of these

Introduction to Collective Operations in MPI

MPI also supports communications among groups of processors

  • not absolutely necessary for programming (but very nice!)
  • essential for performance

Collective operations are called by all processes in a communicator.

  • MPI_BCAST distributes data from one process (the root) to all others in a communicator.
1
2
3
4
int MPI_Bcast(void *buffer, int count, MPI_Datatype datatype,
               int root, MPI_Comm comm)
int MPI_Ibcast(void *buffer, int count, MPI_Datatype datatype, 
               int root, MPI_Comm comm, MPI_Request*request)

shutup

  • MPI_REDUCE combines data from all processes in the communicator and returns it to one process.
1
2
3
int MPI_Reduce(const void *sendbuf, void *recvbuf, int count,
               MPI_Datatype datatype, MPI_Op op, int root,
               MPI_Comm comm)

shutup

In many numerical algorithms, SEND/RECV can be replaced by BCAST/REDUCE, improving both simplicity and efficiency.

Communication and computation is coordinated among a group of processes in a communicator.

Tags are not used; different communicators deliver similar functionality.

allgather alltoall

routines

Non-blocking collective operations in MPI-3

There are 3 classes of operations:

  • synchronization,
  • data movement,
  • collective computation.

Synchronization

Blocks until all processes in the group of the communicator call it.

A process cannot get out of the barrier until all other processes have reached the barrier

  • “All” versions deliver results to all participating processes
  • “V” versions (stands for vector) allow the chunks to have different sizes

MPI Built-in Collective Computation Operations

OperationDescription
MPI_MAXMaximum
MPI_MINMinimum
MPI_PRODProduct
MPI_SUMSum
MPI_LANDLogical AND
MPI_LORLogical OR
MPI_LXORLogical XOR
MPI_BANDBitwise AND
MPI_BORBitwise OR
MPI_BXORBitwise XOR
MPI_MAXLOCMaximum and Location
MPI_MINLOCMinimum and Location

Reduce Implementation: a tree-structured global sum

Reduce

  1. In the first phase:
    1. Process 1 sends to 0, 3 sends to 2, 5 sends to 4, and 7 sends to 6.
    2. Processes 0, 2, 4, and 6 add in the received values.
  2. Second phase:
    1. Processes 2 and 6 send their new values to processes 0 and 4, respectively.
    2. Processes 0 and 4 add the received values into their new values.
  3. Finally:
    1. Process 4 sends its newest value to process 0.
    2. Process 0 adds the received value to its newest value.

Broadcast Implementation

shutup

Allreduce != Reduce + Broadcast

One- and Two-sided Communication

The basic idea of one-sided communication models is to decouple data movement with process synchronization

  • Should be able to move data without requiring that the remote process synchronize
  • Each process exposes a part of its memory to other processes
  • Other processes can directly read from or write to this memory

global addr space twosided example onesided example timing

MPI Remote Memory Access

Creating Public Memory

Any memory used by a process is, by default, only locally accessible

  • X = malloc(100);

Once the memory is allocated, the user has to make an explicit MPI call to declare a memory region as remotely accessible

  • MPI terminology for remotely accessible memory is a “window
  • A group of processes collectively create a “window

Once a memory region is declared as remotely accessible, all processes in the window can read/write/update data to this memory without explicitly synchronizing with the target process

  • MPI routines usually required for read/write/update of remote data

Window creation models

$4$ models exist:

  • MPI_WIN_ALLOCATE
    • You want to create a buffer and directly make it remotely accessible
    • Create a remotely accessible memory region in an RMA window
    • Only data exposed in a window can be accessed with RMA ops
  • MPI_WIN_CREATE
    • You already have an allocated buffer that you would like to make remotely accessible
  • MPI_WIN_CREATE_DYNAMIC
    • You don’t have a buffer yet, but will have one in the future
    • You may want to dynamically add/remove buffers to/from the window
  • MPI_WIN_ALLOCATE_SHARED
    • You want multiple processes on the same node to share a buffer

Data movement

MPI provides ability to read, write and atomically update (modify) data in remotely accessible memory regions

  • MPI_PUT
    • Move data from origin, to target
    • Separate data description triples for origin and target put
  • MPI_GET
    • Move data to origin, from target
    • Separate data description triples for origin and target get
  • MPI_ACCUMULATE (atomic)
  • MPI_GET_ACCUMULATE (atomic)
  • MPI_COMPARE_AND_SWAP (atomic)
  • MPI_FETCH_AND_OP (atomic)

Atomic: Accumulate

Atomic update operation (similar to a put)

  • Reduces origin and target data into target buffer using op argument as combiner
  • Available operations:
    • MPI_SUM,
    • MPI_PROD,
    • MPI_OR,
    • MPI_REPLACE,
    • MPI_NO_OP`,
  • Predefined ops only, no user-defined operations

With Op = MPI_REPLACE, you implement an atomic PUT

replace

Atomic: Get Accumulate

Atomic read-modify-write

  • Available operations:
    • MPI_SUM,
    • MPI_PROD,
    • MPI_OR,
    • MPI_REPLACE,
    • MPI_NO_OP
  • You can use predefined operations only
  • Result stored in target buffer
  • Original data stored in result buffer
  • Different data layouts between target/origin
  • Basic type elements must match

Atomic GET with MPI_NO_OP

Atomic SWAP with MPI REPLACE

Atomic Fetch_and_op, Compare_and_swap

FOP: Simpler version of MPI_Get_accumulate

  • All buffers share a single predefined datatype
  • No count argument (it’s always 1)
  • Simpler interface allows hardware optimization

CAS: Atomic swap if target value is equal to compare value

Ordering of Operations in MPI RMA

No guaranteed ordering for Put/Get operations

  • Result of concurrent Puts to the same location undefined
  • Result of Get concurrent with Put/Accumulate undefined
  • Can be garbage in both cases

Result of concurrent accumulate operations to the same location are defined according to the order in which they occurred

  • Atomic put: Accumulate with op = MPI_REPLACE
  • Atomic get: Get_accumulate with op = MPI_NO_OP

Accumulate operations from a given process are ordered by default.

  • User can tell the MPI implementation that he does not require ordering as optimization hint with info when window is created.
  • You can ask for only the needed orderings: RAW (read-after-write), WAR, RAR, or WAW

Examples with operation ordering

example

RMA Synchronization Models

RMA data access model:

  • When is a process allowed to read/write remotely accessible memory?
  • When is data written by process X is available for process Y to read?

During an exposure epoch:

  1. You should not perform local accesses to the memory window.
  2. Only one remote process can issue MPI_Put.
  3. There can be multiple MPI_Accumulate function calls.

RMA synchronization models define these semantics

  • 3 synchronization models provided by MPI:
    • Fence (active target)
    • Post-start-complete-wait (generalized active target)
    • Lock/Unlock (passive target)

Data accesses occur withinepochs

  • Access epochs (called by origin process):
    • contain a set of operations issued by an origin process
    • there can be multiple access epochs within the same exposure epoch
  • Exposure epochs (called by target process):
    • enable remote processes to update a target’s window
    • target process makes known to potential origin processes the availability of its memory window

Epochs define ordering and completion semantics

  • Synchronization models provide mechanisms for establishing epochs
    • e.g., starting, ending, and synchronizing epochs

Fence: Active Target Synchronization

MPI_Win_fence(int assert, MPI_Win win) fence

Collective synchronization model

  • Starts and ends access and exposure epochs on all processes in the window
  • All processes in group of “win” do an MPI_WIN_FENCE to open an epoch
  • Everyone can issue PUT/GET operations to read/write data
  • Everyone does an MPI_WIN_FENCE to close the epoch
  • All operations complete at the second fence synchronization

PSCW: Generalized Active Target Synchronization

PSCW: Post-Start-Complete-Wait

1
2
MPI_Win_post/start(MPI_Group grp, int assert, MPI_Win win)
MPI_Win_complete/wait(MPI_Win win)
  • Like FENCE, but origin and target specify who they communicate with
  • Target: Exposure epoch
    • Opened with MPI_Win_post
    • Closed by MPI_Win_wait
  • Origin: Access epoch
    • Opened with MPI_Win_start
    • Closed by MPI_Win_complete
  • All synchronization operations may block, to enforce P-S/C-W ordering
  • Processes can be both origins and targets

example

Lock/Unlock: Passive Target Synchronization

passive

Passive mode: One-sided, asynchronous communication

  • Target does not participate explicitly in communication operation
  • Shared memory-like model
    • (in the sense that the operations are all called on the origin process, with no synchronization routines called on the target process)
  • Lock is an unfortunate choice of name - not a mutual exclusion; more a begin/end of RMA operations
1
2
3
MPI_Win_lock(int locktype, int rank, int assert, MPI_Win win)
MPI_Win_unlock(int rank, MPI_Win win)
MPI_Win_flush/flush_local(int rank, MPI_Win win)

Lock/Unlock: Begin/end passive mode epoch

  • Target process does not make a corresponding MPI call
  • Can initiate multiple passive target epochs to different processes
  • Concurrent epochs to same process not allowed (affects threads)

Lock type:

  • SHARED: Other processes using shared can access concurrently
  • EXCLUSIVE: No other processes can access concurrently

Flush: remotely complete RMA operations to the target process

  • After completion, data can be read by target process or a different process

Flush_local: locally complete RMA operations to the target process

Advanced Passive Target Synchronization

1
2
3
4
MPI_Win_lock_all(int assert, MPI_Win win)
MPI_Win_unlock_all(MPI_Win win)
MPI_Win_flush_all(MPI_Win win)
MPI_Win_flush_local_all(MPI_Win win)
  • Lock_all: Shared lock, passive target epoch to all other processes
    • Expected usage is long-lived: lock_all, put/get, flush, …, unlock_all
  • Flush_all- remotely complete RMA operations to all processes
  • Flush_local_all- locally complete RMA operations to all processes

Which synchronization mode should I use, when?

  • RMA communication has low overheads versus send/recv
    • Two-sided: Matching, queuing, buffering, unexpected receives, etc…
    • One-sided: No matching, no buffering, always ready to receive
    • Utilize RDMA provided by high-speed interconnects (e.g. InfiniBand)
  • Active mode: bulk synchronization
    • e.g. ghost cell exchange
  • Passive mode: asynchronous data movement
    • Useful when dataset is large, requiring memory of multiple nodes
    • Also, when data access and synchronization pattern is dynamic
    • Common use case: distributed, shared arrays
  • Passive target locking mode
    • Lock/unlock - Useful when exclusive epochs are needed
    • Lock_all/unlock_all- Useful when only shared epochs are needed

Assignment

  • Write an MPI program which sends data in a “ring” among all P processes, i.e., rank i sends to (i+1)%P, which forwards the received data to (i+2)%P, etc.
  • Compare the performance of two- and one-sided comm operations
This post is licensed under CC BY 4.0 by the author.