30#define Mheader "SharedMemoryMpi: "
36#include <cuda_runtime_api.h>
39#include <hip/hip_runtime_api.h>
42#ifdef ACCELERATOR_AWARE_MPI
43#define GRID_SYCL_LEVEL_ZERO_IPC
47 #warning " Using NUMAIF "
54#include <sys/socket.h>
66static const char *sock_path_fmt =
"/tmp/GridUnixSocket.%d";
67static char sock_path[256];
70 static void Open(
int rank)
74 sock = socket(AF_UNIX, SOCK_DGRAM, 0); assert(sock>0);
76 struct sockaddr_un sa_un = { 0 };
77 sa_un.sun_family = AF_UNIX;
78 snprintf(sa_un.sun_path,
sizeof(sa_un.sun_path),sock_path_fmt,rank);
79 unlink(sa_un.sun_path);
80 if (bind(sock, (
struct sockaddr *)&sa_un,
sizeof(sa_un))) {
81 perror(
"bind failure");
86 static int RecvFileDescriptor(
void)
94 char cms[CMSG_SPACE(
sizeof(
int))];
99 memset(&msg, 0,
sizeof msg);
105 msg.msg_control = (caddr_t)cms;
106 msg.msg_controllen =
sizeof cms;
108 if((n=recvmsg(sock, &msg, 0)) < 0) {
109 perror(
"recvmsg failed");
113 perror(
"recvmsg returned 0");
116 cmsg = CMSG_FIRSTHDR(&msg);
118 memmove(&fd, CMSG_DATA(cmsg),
sizeof(
int));
123 static void SendFileDescriptor(
int fildes,
int xmit_to_rank)
127 struct cmsghdr *cmsg = NULL;
128 char ctrl[CMSG_SPACE(
sizeof(
int))];
131 memset(&msg, 0,
sizeof(
struct msghdr));
132 memset(ctrl, 0, CMSG_SPACE(
sizeof(
int)));
133 iov.iov_base = &data;
134 iov.iov_len =
sizeof(data);
136 sprintf(sock_path,sock_path_fmt,xmit_to_rank);
138 struct sockaddr_un sa_un = { 0 };
139 sa_un.sun_family = AF_UNIX;
140 snprintf(sa_un.sun_path,
sizeof(sa_un.sun_path),sock_path_fmt,xmit_to_rank);
142 msg.msg_name = (
void *)&sa_un;
143 msg.msg_namelen =
sizeof(sa_un);
146 msg.msg_controllen = CMSG_SPACE(
sizeof(
int));
147 msg.msg_control = ctrl;
149 cmsg = CMSG_FIRSTHDR(&msg);
150 cmsg->cmsg_level = SOL_SOCKET;
151 cmsg->cmsg_type = SCM_RIGHTS;
152 cmsg->cmsg_len = CMSG_LEN(
sizeof(
int));
154 *((
int *) CMSG_DATA(cmsg)) = fildes;
156 sendmsg(sock, &msg, 0);
174#ifndef GRID_MPI3_SHM_NONE
175 MPI_Comm_split_type(comm, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL,&
WorldShmComm);
184 std::cout <<
Mheader " World communicator of size " <<
WorldSize << std::endl;
199 MPI_Group WorldGroup, ShmGroup;
203 std::vector<int> world_ranks(
WorldSize);
for(
int r=0;r<
WorldSize;r++) world_ranks[r]=r;
212 std::vector<int> MyGroup;
221 std::sort(MyGroup.begin(),MyGroup.end(),std::less<int>());
222 int myleader = MyGroup[0];
224 std::vector<int> leaders_1hot(
WorldSize,0);
226 leaders_1hot [ myleader ] = 1;
231 int ierr=MPI_Allreduce(MPI_IN_PLACE,&leaders_1hot[0],
WorldSize,MPI_INT,MPI_SUM,
WorldComm);
240 leaders_group[group++] = l;
249 if (myleader == leaders_group[g]){
264 for(
int i=0;i<=MAXLOG2;i++){
265 if ( (0x1<<i) == TwoToPower ) {
277 const int namelen = _POSIX_HOST_NAME_MAX;
282 gethostname(name,namelen);
283 int nscan = sscanf(name,
"r%di%dn%d",&R,&I,&N) ;
295 assert(log2size != -1);
304 const int maxhdim = 10;
305 std::vector<int> HyperCubeCoords(maxhdim,0);
306 std::vector<int> RootHyperCubeCoords(maxhdim,0);
310 const int namelen = _POSIX_HOST_NAME_MAX;
314 gethostname(name,namelen);
315 int nscan = sscanf(name,
"r%di%dn%d",&R,&I,&N) ;
320 uint32_t hypercoor = (R<<8)|(I<<5)|(nhi<<3)|nlo ;
321 uint32_t rootcoor = hypercoor;
326 for(
int d=0;d<maxhdim;d++){
327 HyperCubeCoords[d] = (hypercoor>>d)&0x1;
330 std::string hname(name);
338 MPI_Bcast(&rootcoor,
sizeof(rootcoor), MPI_BYTE, 0,
WorldComm);
339 hypercoor=hypercoor-rootcoor;
341 assert(hypercoor>=0);
346 for(
int d=0;d<maxhdim;d++){
347 HyperCubeCoords[d] = (hypercoor>>d)&0x1;
354 int ndimension = processors.
size();
367 for(
int d=0;d<ndimension;d++){
368 NodeDims[d] = WorldDims[d]/ShmDims[d];
374 int hcoor = hypercoor;
375 for(
int d=0;d<ndimension;d++){
377 int msk = (0x1<<bits)-1;
378 HyperCoor[d]=hcoor & msk;
380 hcoor = hcoor >> bits;
386 for(
int i=0;i<ndimension;i++){
387 Nprocessors*=processors[i];
396 Lexicographic::CoorFromIndexReversed(NodeCoor,
WorldNode ,NodeDims);
398 for(
int d=0;d<ndimension;d++) NodeCoor[d]=HyperCoor[d];
400 Lexicographic::CoorFromIndexReversed(ShmCoor ,
WorldShmRank,ShmDims);
401 for(
int d=0;d<ndimension;d++) WorldCoor[d] = NodeCoor[d]*ShmDims[d]+ShmCoor[d];
402 Lexicographic::IndexFromCoorReversed(WorldCoor,rank,WorldDims);
407 int ierr= MPI_Comm_split(
WorldComm,0,rank,&optimal_comm);
416 int ndimension = processors.
size();
427 for(
int d=0;d<ndimension;d++){
428 NodeDims[d] = WorldDims[d]/ShmDims[d];
435 for(
int i=0;i<ndimension;i++){
436 Nprocessors*=processors[i];
445 Lexicographic::CoorFromIndexReversed(NodeCoor,
WorldNode ,NodeDims);
446 Lexicographic::CoorFromIndexReversed(ShmCoor ,
WorldShmRank,ShmDims);
447 for(
int d=0;d<ndimension;d++) WorldCoor[d] = NodeCoor[d]*ShmDims[d]+ShmCoor[d];
448 Lexicographic::IndexFromCoorReversed(WorldCoor,rank,WorldDims);
453 int ierr= MPI_Comm_split(
WorldComm,0,rank,&optimal_comm);
459#ifdef GRID_MPI3_SHMGET
462 std::cout <<
Mheader "SharedMemoryAllocate "<< bytes<<
" shmget implementation "<<std::endl;
476 key_t key = IPC_PRIVATE;
477 int flags = IPC_CREAT | SHM_R | SHM_W;
481 if ((shmids[r]= shmget(key,size, flags)) ==-1) {
483 printf(
"Errno %d\n",errsv);
484 printf(
"key %d\n",key);
485 printf(
"size %ld\n",size);
486 printf(
"flags %d\n",flags);
499 perror(
"Shared memory attach failure");
500 shmctl(shmids[r], IPC_RMID, NULL);
509 shmctl(shmids[r], IPC_RMID,(
struct shmid_ds *)NULL);
521#if defined(GRID_CUDA) ||defined(GRID_HIP) || defined(GRID_SYCL)
544#ifndef ACCELERATOR_AWARE_MPI
546 HostCommBuf= malloc(bytes);
549 if (ShmCommBuf == (
void *)NULL ) {
550 std::cerr <<
"SharedMemoryMPI.cc acceleratorAllocDevice failed NULL pointer for " << bytes<<
" bytes " << std::endl;
554 std::cout <<
Mheader " acceleratorAllocDevice "<< bytes
555 <<
"bytes at "<< std::hex<< ShmCommBuf <<
" - "<<(bytes-1+(uint64_t)ShmCommBuf) <<std::dec<<
" for comms buffers " <<std::endl;
559 std::cout<<
Mheader "Setting up IPC"<<std::endl;
571#ifndef GRID_MPI3_SHM_NONE
575 void * thisBuf = ShmCommBuf;
577#ifdef GRID_SYCL_LEVEL_ZERO_IPC
578 typedef struct {
int fd; pid_t pid ; ze_ipc_mem_handle_t ze; } clone_mem_t;
580 auto zeDevice = sycl::get_native<sycl::backend::ext_oneapi_level_zero>(theGridAccelerator->get_device());
581 auto zeContext = sycl::get_native<sycl::backend::ext_oneapi_level_zero>(theGridAccelerator->get_context());
583 ze_ipc_mem_handle_t ihandle;
587 auto err = zeMemGetIpcHandle(zeContext,ShmCommBuf,&ihandle);
588 if ( err != ZE_RESULT_SUCCESS ) {
589 std::cerr <<
"SharedMemoryMPI.cc zeMemGetIpcHandle failed for rank "<<r<<
" "<<std::hex<<err<<std::dec<<std::endl;
592 memcpy((
void *)&handle.fd,(
void *)&ihandle,
sizeof(
int));
593 handle.pid = getpid();
594 memcpy((
void *)&handle.ze,(
void *)&ihandle,
sizeof(ihandle));
598 UnixSockets::SendFileDescriptor(handle.fd,rr);
605 cudaIpcMemHandle_t handle;
607 auto err = cudaIpcGetMemHandle(&handle,ShmCommBuf);
608 if ( err != cudaSuccess) {
609 std::cerr <<
" SharedMemoryMPI.cc cudaIpcGetMemHandle failed for rank" << r <<
" "<<cudaGetErrorString(err)<< std::endl;
615 hipIpcMemHandle_t handle;
617 auto err = hipIpcGetMemHandle(&handle,ShmCommBuf);
618 if ( err != hipSuccess) {
619 std::cerr <<
" SharedMemoryMPI.cc hipIpcGetMemHandle failed for rank" << r <<
" "<<hipGetErrorString(err)<< std::endl;
630 int ierr=MPI_Bcast(&handle,
642#ifdef GRID_SYCL_LEVEL_ZERO_IPC
647 myfd=UnixSockets::RecvFileDescriptor();
653 int pidfd = syscall(SYS_pidfd_open,handle.pid,0);
656 myfd = syscall(438,pidfd,handle.fd,0);
659 fprintf(stderr,
"pidfd_getfd returned %d errno was %d\n", myfd,err_t); fflush(stderr);
660 perror(
"pidfd_getfd failed ");
665 memcpy((
void *)&ihandle,(
void *)&handle.ze,
sizeof(ihandle));
666 memcpy((
void *)&ihandle,(
void *)&myfd,
sizeof(
int));
668 auto err = zeMemOpenIpcHandle(zeContext,zeDevice,ihandle,0,&thisBuf);
669 if ( err != ZE_RESULT_SUCCESS ) {
670 std::cerr <<
"SharedMemoryMPI.cc "<<zeContext<<
" "<<zeDevice<<std::endl;
671 std::cerr <<
"SharedMemoryMPI.cc zeMemOpenIpcHandle failed for rank "<<r<<
" "<<std::hex<<err<<std::dec<<std::endl;
674 assert(thisBuf!=
nullptr);
679 auto err = cudaIpcOpenMemHandle(&thisBuf,handle,cudaIpcMemLazyEnablePeerAccess);
680 if ( err != cudaSuccess) {
681 std::cerr <<
" SharedMemoryMPI.cc cudaIpcOpenMemHandle failed for rank" << r <<
" "<<cudaGetErrorString(err)<< std::endl;
688 auto err = hipIpcOpenMemHandle(&thisBuf,handle,hipIpcMemLazyEnablePeerAccess);
689 if ( err != hipSuccess) {
690 std::cerr <<
" SharedMemoryMPI.cc hipIpcOpenMemHandle failed for rank" << r <<
" "<<hipGetErrorString(err)<< std::endl;
711#ifdef GRID_MPI3_SHMMMAP
714 std::cout <<
Mheader "SharedMemoryAllocate "<< bytes<<
" MMAP implementation "<<
GRID_SHM_PATH <<std::endl;
726 char shm_name [NAME_MAX];
730 int fd=open(shm_name,O_RDWR|O_CREAT,0666);
732 printf(
"open %s failed\n",shm_name);
733 perror(
"open hugetlbfs");
736 int mmap_flag = MAP_SHARED ;
738 mmap_flag|=MAP_POPULATE;
741 if ( flags ) mmap_flag |= MAP_HUGETLB;
743 void *ptr = (
void *) mmap(NULL, bytes, PROT_READ | PROT_WRITE, mmap_flag,fd, 0);
744 if ( ptr == (
void *)MAP_FAILED ) {
745 printf(
"mmap %s failed\n",shm_name);
746 perror(
"failed mmap"); assert(0);
748 assert(((uint64_t)ptr&0x3F)==0);
753 std::cout<<
Mheader " Intra-node IPC setup is complete "<<std::endl;
759#ifdef GRID_MPI3_SHM_NONE
762 std::cout <<
Mheader "SharedMemoryAllocate "<< bytes<<
" MMAP anonymous implementation "<<std::endl;
774 char shm_name [NAME_MAX];
779 int mmap_flag = MAP_SHARED |MAP_ANONYMOUS ;
781 mmap_flag|=MAP_POPULATE;
784 if ( flags ) mmap_flag |= MAP_HUGETLB;
786 void *ptr = (
void *) mmap(NULL, bytes, PROT_READ | PROT_WRITE, mmap_flag,fd, 0);
787 if ( ptr == (
void *)MAP_FAILED ) {
788 printf(
"mmap %s failed\n",shm_name);
789 perror(
"failed mmap"); assert(0);
791 assert(((uint64_t)ptr&0x3F)==0);
801#ifdef GRID_MPI3_SHMOPEN
809 std::cout <<
Mheader "SharedMemoryAllocate "<< bytes<<
" SHMOPEN implementation "<<std::endl;
815 char shm_name [NAME_MAX];
821 struct passwd *pw = getpwuid (getuid());
822 sprintf(shm_name,
"/Grid_%s_mpi3_shm_%d_%d",pw->pw_name,
WorldNode,r);
824 shm_unlink(shm_name);
825 int fd=shm_open(shm_name,O_RDWR|O_CREAT,0666);
826 if ( fd < 0 ) { perror(
"failed shm_open"); assert(0); }
829 int mmap_flag = MAP_SHARED;
831 mmap_flag |= MAP_POPULATE;
834 if (flags) mmap_flag |= MAP_HUGETLB;
836 void * ptr = mmap(NULL,size, PROT_READ | PROT_WRITE, mmap_flag, fd, 0);
838 if ( ptr == (
void * )MAP_FAILED ) {
839 perror(
"failed mmap");
842 assert(((uint64_t)ptr&0x3F)==0);
854 size_t size = bytes ;
856 struct passwd *pw = getpwuid (getuid());
857 sprintf(shm_name,
"/Grid_%s_mpi3_shm_%d_%d",pw->pw_name,
WorldNode,r);
859 int fd=shm_open(shm_name,O_RDWR,0666);
860 if ( fd<0 ) { perror(
"failed shm_open"); assert(0); }
862 void * ptr = mmap(NULL,size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
863 if ( ptr == MAP_FAILED ) { perror(
"failed mmap"); assert(0); }
864 assert(((uint64_t)ptr&0x3F)==0);
881#if defined(GRID_CUDA) || defined(GRID_HIP) || defined(GRID_SYCL)
902 MPI_Comm_rank(comm,&rank);
903 MPI_Comm_size(comm,&size);
909#ifndef GRID_MPI3_SHM_NONE
910 MPI_Comm_split_type(comm, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL,&
ShmComm);
912 MPI_Comm_split(comm, rank, 0, &
ShmComm);
927 MPI_Allreduce(MPI_IN_PLACE,&wsr,1,MPI_UINT32_T,MPI_SUM,
ShmComm);
934#ifndef ACCELERATOR_AWARE_MPI
936 HostCommBuf= GlobalSharedMemory::HostCommBuf;
943 MPI_Group FullGroup, ShmGroup;
944 MPI_Comm_group (comm , &FullGroup);
945 MPI_Comm_group (
ShmComm, &ShmGroup);
947 std::vector<int> ranks(size);
for(
int r=0;r<size;r++) ranks[r]=r;
948 MPI_Group_translate_ranks (FullGroup,size,&ranks[0],ShmGroup, &
ShmRanks[0]);
950#ifdef GRID_SHM_FORCE_MPI
953 for(
int r=0;r<size;r++){
977 uint64_t magic = 0x5A5A5A;
979 for(uint64_t r=0;r<
ShmSize;r++){
987 for(uint64_t r=0;r<
ShmSize;r++){
991 assert(check[2]==magic);
994 std::cout <<
GridLogDebug <<
" SharedMemoryTest has passed "<<std::endl;
1000 if (gpeer == MPI_UNDEFINED){
1011 if (gpeer == MPI_UNDEFINED){
1015 uint64_t remote = (uint64_t)
ShmCommBufs[gpeer]+offset;
1017 return (
void *) remote;
1022 int MPI_is_finalised; MPI_Finalized(&MPI_is_finalised);
1023 if ( !MPI_is_finalised ) {
void * acceleratorAllocDevice(size_t bytes)
void acceleratorCopyToDevice(void *from, void *to, size_t bytes)
void acceleratorMemSet(void *base, int value, size_t bytes)
void acceleratorCopyFromDevice(void *from, void *to, size_t bytes)
AcceleratorVector< int, MaxDims > Coordinate
Out accelerator_inline binary(Input1 src_1, Input2 src_2, Operation op)
GridLogger GridLogDebug(1, "Debug", GridLogColours, "PURPLE")
#define NAMESPACE_BEGIN(A)
int BinaryToGray(int binary)
int Log2Size(int TwoToPower, int MAXLOG2)
accelerator_inline size_type size(void) const
static void SharedMemoryAllocate(uint64_t bytes, int flags)
static const int MAXLOG2RANKSPERNODE
static void OptimalCommunicatorHypercube(const Coordinate &processors, Grid_MPI_Comm &optimal_comm, Coordinate &ShmDims)
static Grid_MPI_Comm WorldComm
static uint64_t _ShmAllocBytes
static std::vector< int > WorldShmRanks
static uint64_t ShmAllocBytes(void)
static void Init(Grid_MPI_Comm comm)
static void GetShmDims(const Coordinate &WorldDims, Coordinate &ShmDims)
static void SharedMemoryZero(void *dest, size_t bytes)
static void OptimalCommunicatorSharedMemory(const Coordinate &processors, Grid_MPI_Comm &optimal_comm, Coordinate &ShmDims)
static std::vector< void * > WorldShmCommBufs
static int ShmAlloc(void)
static Grid_MPI_Comm WorldShmComm
static void OptimalCommunicator(const Coordinate &processors, Grid_MPI_Comm &optimal_comm, Coordinate &ShmDims)
void * ShmBufferTranslate(int rank, void *local_p)
std::vector< int > ShmRanks
void * ShmBuffer(int rank)
void ShmBufferFreeAll(void)
void SharedMemoryTest(void)
void SetCommunicator(Grid_MPI_Comm comm)
std::vector< void * > ShmCommBufs