Grid 0.7.0
Communicator_mpi3.cc
Go to the documentation of this file.
1/*************************************************************************************
2
3 Grid physics library, www.github.com/paboyle/Grid
4
5 Source file: ./lib/communicator/Communicator_mpi.cc
6
7 Copyright (C) 2015
8
9Author: Peter Boyle <paboyle@ph.ed.ac.uk>
10
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
20
21 You should have received a copy of the GNU General Public License along
22 with this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24
25 See the full license in the file "LICENSE" in the top level distribution directory
26*************************************************************************************/
27/* END LEGAL */
28#include <Grid/GridCore.h>
30
32
33
35
37// First initialise of comms system
39void CartesianCommunicator::Init(int *argc, char ***argv)
40{
41
42 int flag;
43 int provided;
44
45 MPI_Initialized(&flag); // needed to coexist with other libs apparently
46 if ( !flag ) {
47
48#ifndef GRID_COMMS_THREADS
50 // wrong results here too
51 // For now: comms-overlap leads to wrong results in Benchmark_wilson even on single node MPI runs
52 // other comms schemes are ok
53 MPI_Init_thread(argc,argv,MPI_THREAD_SERIALIZED,&provided);
54#else
55 MPI_Init_thread(argc,argv,MPI_THREAD_MULTIPLE,&provided);
56#endif
57 //If only 1 comms thread we require any threading mode other than SINGLE, but for multiple comms threads we need MULTIPLE
58 if( (nCommThreads == 1) && (provided == MPI_THREAD_SINGLE) ) {
59 assert(0);
60 }
61
62 if( (nCommThreads > 1) && (provided != MPI_THREAD_MULTIPLE) ) {
63 assert(0);
64 }
65 }
66
67 // Never clean up as done once.
68 MPI_Comm_dup (MPI_COMM_WORLD,&communicator_world);
69
76}
77
79// Use cartesian communicators now even in MPI3
81void CartesianCommunicator::ShiftedRanks(int dim,int shift,int &source,int &dest)
82{
83 int ierr=MPI_Cart_shift(communicator,dim,shift,&source,&dest);
84 assert(ierr==0);
85}
87{
88 int rank;
89 int ierr=MPI_Cart_rank (communicator, &coor[0], &rank);
90 assert(ierr==0);
91 return rank;
92}
94{
95 coor.resize(_ndimension);
96 int ierr=MPI_Cart_coords (communicator, rank, _ndimension,&coor[0]);
97 assert(ierr==0);
98}
99
101// Initialises from communicator_world
104{
105 MPI_Comm optimal_comm;
107 // Remap using the shared memory optimising routine
108 // The remap creates a comm which must be freed
111 InitFromMPICommunicator(processors,optimal_comm);
112 SetCommunicator(optimal_comm);
114 // Free the temp communicator
116 MPI_Comm_free(&optimal_comm);
117}
118
120// Try to subdivide communicator
123{
124 _ndimension = processors.size(); assert(_ndimension>=1);
125 int parent_ndimension = parent._ndimension; assert(_ndimension >= parent._ndimension);
126 Coordinate parent_processor_coor(_ndimension,0);
127 Coordinate parent_processors (_ndimension,1);
128 Coordinate shm_processors (_ndimension,1);
129 // Can make 5d grid from 4d etc...
130 int pad = _ndimension-parent_ndimension;
131 for(int d=0;d<parent_ndimension;d++){
132 parent_processor_coor[pad+d]=parent._processor_coor[d];
133 parent_processors [pad+d]=parent._processors[d];
134 shm_processors [pad+d]=parent._shm_processors[d];
135 }
136
138 // split the communicator
140 // int Nparent = parent._processors ;
141 int Nparent;
142 MPI_Comm_size(parent.communicator,&Nparent);
143
144 int childsize=1;
145 for(int d=0;d<processors.size();d++) {
146 childsize *= processors[d];
147 }
148 int Nchild = Nparent/childsize;
149 assert (childsize * Nchild == Nparent);
150
151 Coordinate ccoor(_ndimension); // coor within subcommunicator
152 Coordinate scoor(_ndimension); // coor of split within parent
153 Coordinate ssize(_ndimension); // coor of split within parent
154
155 for(int d=0;d<_ndimension;d++){
156 ccoor[d] = parent_processor_coor[d] % processors[d];
157 scoor[d] = parent_processor_coor[d] / processors[d];
158 ssize[d] = parent_processors[d] / processors[d];
159 if ( processors[d] < shm_processors[d] ) shm_processors[d] = processors[d]; // subnode splitting.
160 }
161
162 // rank within subcomm ; srank is rank of subcomm within blocks of subcomms
163 int crank;
164 // Mpi uses the reverse Lexico convention to us; so reversed routines called
165 Lexicographic::IndexFromCoorReversed(ccoor,crank,processors); // processors is the split grid dimensions
166 Lexicographic::IndexFromCoorReversed(scoor,srank,ssize); // ssize is the number of split grids
167
168 MPI_Comm comm_split;
169 if ( Nchild > 1 ) {
170
172 // Split the communicator
174 int ierr= MPI_Comm_split(parent.communicator,srank,crank,&comm_split);
175 assert(ierr==0);
176
177 } else {
178 srank = 0;
179 int ierr = MPI_Comm_dup (parent.communicator,&comm_split);
180 assert(ierr==0);
181 }
182
184 // Set up from the new split communicator
186 InitFromMPICommunicator(processors,comm_split);
187
189 // Take the right SHM buffers
191 SetCommunicator(comm_split);
192
194 // Free the temp communicator
196 MPI_Comm_free(&comm_split);
197
198 if(0){
199 std::cout << " ndim " <<_ndimension<<" " << parent._ndimension << std::endl;
200 for(int d=0;d<processors.size();d++){
201 std::cout << d<< " " << _processor_coor[d] <<" " << ccoor[d]<<std::endl;
202 }
203 }
204 for(int d=0;d<processors.size();d++){
205 assert(_processor_coor[d] == ccoor[d] );
206 }
207}
208
209void CartesianCommunicator::InitFromMPICommunicator(const Coordinate &processors, MPI_Comm communicator_base)
210{
212 // Creates communicator, and the communicator_halo
214 _ndimension = processors.size();
216
218 // Count the requested nodes
220 _Nprocessors=1;
221 _processors = processors;
222 for(int i=0;i<_ndimension;i++){
224 }
225
226 Coordinate periodic(_ndimension,1);
227 MPI_Cart_create(communicator_base, _ndimension,&_processors[0],&periodic[0],0,&communicator);
228 MPI_Comm_rank(communicator,&_processor);
230
231 if ( 0 && (communicator_base != communicator_world) ) {
232 std::cout << "InitFromMPICommunicator Cartesian communicator created with a non-world communicator"<<std::endl;
233 std::cout << " new communicator rank "<<_processor<< " coor ["<<_ndimension<<"] ";
234 for(int d=0;d<_processors.size();d++){
235 std::cout << _processor_coor[d]<<" ";
236 }
237 std::cout << std::endl;
238 }
239
240 int Size;
241 MPI_Comm_size(communicator,&Size);
242
244 for(int i=0;i<_ndimension*2;i++){
245 MPI_Comm_dup(communicator,&communicator_halo[i]);
246 }
247 assert(Size==_Nprocessors);
248}
249
251{
252 int MPI_is_finalised;
253 MPI_Finalized(&MPI_is_finalised);
254 if (communicator && !MPI_is_finalised) {
255 MPI_Comm_free(&communicator);
256 for(int i=0;i<communicator_halo.size();i++){
257 MPI_Comm_free(&communicator_halo[i]);
258 }
259 }
260}
261#ifdef USE_GRID_REDUCTION
271#else
273 FlightRecorder::StepLog("AllReduce float");
274 int ierr=MPI_Allreduce(MPI_IN_PLACE,&f,1,MPI_FLOAT,MPI_SUM,communicator);
275 assert(ierr==0);
276}
278{
279 FlightRecorder::StepLog("AllReduce double");
280 int ierr = MPI_Allreduce(MPI_IN_PLACE,&d,1,MPI_DOUBLE,MPI_SUM,communicator);
281 assert(ierr==0);
282}
283#endif
285 FlightRecorder::StepLog("AllReduce uint32_t");
286 int ierr=MPI_Allreduce(MPI_IN_PLACE,&u,1,MPI_UINT32_T,MPI_SUM,communicator);
287 assert(ierr==0);
288}
290 FlightRecorder::StepLog("AllReduce uint64_t");
291 int ierr=MPI_Allreduce(MPI_IN_PLACE,&u,1,MPI_UINT64_T,MPI_SUM,communicator);
292 assert(ierr==0);
293}
295 FlightRecorder::StepLog("AllReduceVector");
296 int ierr=MPI_Allreduce(MPI_IN_PLACE,u,N,MPI_UINT64_T,MPI_SUM,communicator);
297 assert(ierr==0);
298}
300 int ierr=MPI_Allreduce(MPI_IN_PLACE,&u,1,MPI_UINT32_T,MPI_BXOR,communicator);
301 assert(ierr==0);
302}
304 FlightRecorder::StepLog("GlobalXOR");
305 int ierr=MPI_Allreduce(MPI_IN_PLACE,&u,1,MPI_UINT64_T,MPI_BXOR,communicator);
306 assert(ierr==0);
307}
309{
310 FlightRecorder::StepLog("GlobalMax");
311 int ierr=MPI_Allreduce(MPI_IN_PLACE,&f,1,MPI_FLOAT,MPI_MAX,communicator);
312 assert(ierr==0);
313}
315{
316 FlightRecorder::StepLog("GlobalMax");
317 int ierr = MPI_Allreduce(MPI_IN_PLACE,&d,1,MPI_DOUBLE,MPI_MAX,communicator);
318 assert(ierr==0);
319}
321{
322 FlightRecorder::StepLog("GlobalSumVector(float *)");
323 int ierr=MPI_Allreduce(MPI_IN_PLACE,f,N,MPI_FLOAT,MPI_SUM,communicator);
324 assert(ierr==0);
325}
327{
328 FlightRecorder::StepLog("GlobalSumVector(double *)");
329 int ierr = MPI_Allreduce(MPI_IN_PLACE,d,N,MPI_DOUBLE,MPI_SUM,communicator);
330 assert(ierr==0);
331}
332
333void CartesianCommunicator::SendToRecvFromBegin(std::vector<MpiCommsRequest_t> &list,
334 void *xmit,
335 int dest,
336 void *recv,
337 int from,
338 int bytes,int dir)
339{
340 MPI_Request xrq;
341 MPI_Request rrq;
342
343 assert(dest != _processor);
344 assert(from != _processor);
345
346 int tag;
347
348 tag= dir+from*32;
349 int ierr=MPI_Irecv(recv, bytes, MPI_CHAR,from,tag,communicator,&rrq);
350 assert(ierr==0);
351 list.push_back(rrq);
352
353 tag= dir+_processor*32;
354 ierr =MPI_Isend(xmit, bytes, MPI_CHAR,dest,tag,communicator,&xrq);
355 assert(ierr==0);
356 list.push_back(xrq);
357}
358void CartesianCommunicator::CommsComplete(std::vector<MpiCommsRequest_t> &list)
359{
360 int nreq=list.size();
361
362 if (nreq==0) return;
363
364 std::vector<MPI_Status> status(nreq);
365 int ierr = MPI_Waitall(nreq,&list[0],&status[0]);
366 assert(ierr==0);
367 list.resize(0);
368}
369
370// Basic Halo comms primitive
372 int dest,
373 void *recv,
374 int from,
375 int bytes)
376{
377 std::vector<MpiCommsRequest_t> reqs(0);
378
379 int myrank = _processor;
380 int ierr;
381
382 // Enforce no UVM in comms, device or host OK
383 assert(acceleratorIsCommunicable(xmit));
384 assert(acceleratorIsCommunicable(recv));
385
386 // Give the CPU to MPI immediately; can use threads to overlap optionally
387 // printf("proc %d SendToRecvFrom %d bytes Sendrecv \n",_processor,bytes);
388 ierr=MPI_Sendrecv(xmit,bytes,MPI_CHAR,dest,myrank,
389 recv,bytes,MPI_CHAR,from, from,
390 communicator,MPI_STATUS_IGNORE);
391 assert(ierr==0);
392
393}
394// Basic Halo comms primitive
396 int dest, int dox,
397 void *recv,
398 int from, int dor,
399 int bytes,int dir)
400{
401 std::vector<CommsRequest_t> list;
402 double offbytes = StencilSendToRecvFromPrepare(list,xmit,dest,dox,recv,from,dor,bytes,bytes,dir);
403 offbytes += StencilSendToRecvFromBegin(list,xmit,xmit,dest,dox,recv,recv,from,dor,bytes,bytes,dir);
405 return offbytes;
406}
408{
409 int grank = ShmRanks[rank];
410 if ( grank == MPI_UNDEFINED ) return true;
411 else return false;
412}
413
414#ifdef ACCELERATOR_AWARE_MPI
415void CartesianCommunicator::StencilSendToRecvFromPollIRecv(std::vector<CommsRequest_t> &list) {};
416void CartesianCommunicator::StencilSendToRecvFromPollDtoH(std::vector<CommsRequest_t> &list) {};
417double CartesianCommunicator::StencilSendToRecvFromPrepare(std::vector<CommsRequest_t> &list,
418 void *xmit,
419 int dest,int dox,
420 void *recv,
421 int from,int dor,
422 int xbytes,int rbytes,int dir)
423{
424 return 0.0; // Do nothing -- no preparation required
425}
426double CartesianCommunicator::StencilSendToRecvFromBegin(std::vector<CommsRequest_t> &list,
427 void *xmit,void *xmit_comp,
428 int dest,int dox,
429 void *recv,void *recv_comp,
430 int from,int dor,
431 int xbytes,int rbytes,int dir)
432{
433 int ncomm =communicator_halo.size();
434 int commdir=dir%ncomm;
435
436 MPI_Request xrq;
437 MPI_Request rrq;
438
439 int ierr;
440 int gdest = ShmRanks[dest];
441 int gfrom = ShmRanks[from];
442 int gme = ShmRanks[_processor];
443
444 assert(dest != _processor);
445 assert(from != _processor);
446 assert(gme == ShmRank);
447 double off_node_bytes=0.0;
448 int tag;
449
450 if ( dor ) {
451 if ( (gfrom ==MPI_UNDEFINED) || Stencil_force_mpi ) {
452 tag= dir+from*32;
453 // std::cout << " StencilSendToRecvFrom "<<dir<<" MPI_Irecv "<<std::hex<<recv<<std::dec<<std::endl;
454 ierr=MPI_Irecv(recv_comp, rbytes, MPI_CHAR,from,tag,communicator_halo[commdir],&rrq);
455 assert(ierr==0);
456 list.push_back(rrq);
457 off_node_bytes+=rbytes;
458 }
459#ifdef NVLINK_GET
460 else {
461 void *shm = (void *) this->ShmBufferTranslate(from,xmit);
462 assert(shm!=NULL);
463 // std::cout << " StencilSendToRecvFrom "<<dir<<" CopyDeviceToDevice recv "<<std::hex<<recv<<" remote "<<shm <<std::dec<<std::endl;
465 }
466#endif
467 }
468 // This is a NVLINK PUT
469 if (dox) {
470 if ( (gdest == MPI_UNDEFINED) || Stencil_force_mpi ) {
471 tag= dir+_processor*32;
472 ierr =MPI_Isend(xmit_comp, xbytes, MPI_CHAR,dest,tag,communicator_halo[commdir],&xrq);
473 assert(ierr==0);
474 list.push_back(xrq);
475 off_node_bytes+=xbytes;
476 } else {
477#ifndef NVLINK_GET
478 void *shm = (void *) this->ShmBufferTranslate(dest,recv);
479 assert(shm!=NULL);
481#endif
482 }
483 }
484 return off_node_bytes;
485}
486
487void CartesianCommunicator::StencilSendToRecvFromComplete(std::vector<CommsRequest_t> &list,int dir)
488{
489 int nreq=list.size();
490 /*finishes Get/Put*/
492
493 if (nreq==0) return;
494 std::vector<MPI_Status> status(nreq);
495 int ierr = MPI_Waitall(nreq,&list[0],&status[0]);
496 assert(ierr==0);
497 list.resize(0);
498 this->StencilBarrier();
499}
500
501#else /* NOT ... ACCELERATOR_AWARE_MPI */
503// Pipeline mode through host memory
505 /*
506 * In prepare (phase 1):
507 * PHASE 1: (prepare)
508 * - post MPI receive buffers asynch
509 * - post device - host send buffer transfer asynch
510 * PHASE 2: (Begin)
511 * - complete all copies
512 * - post MPI send asynch
513 * - post device - device transfers
514 * PHASE 3: (Complete)
515 * - MPI_waitall
516 * - host-device transfers
517 *
518 *********************************
519 * NB could split this further:
520 *--------------------------------
521 * PHASE 1: (Prepare)
522 * - post MPI receive buffers asynch
523 * - post device - host send buffer transfer asynch
524 * PHASE 2: (BeginInterNode)
525 * - complete all copies
526 * - post MPI send asynch
527 * PHASE 3: (BeginIntraNode)
528 * - post device - device transfers
529 * PHASE 4: (Complete)
530 * - MPI_waitall
531 * - host-device transfers asynch
532 * - (complete all copies)
533 */
534double CartesianCommunicator::StencilSendToRecvFromPrepare(std::vector<CommsRequest_t> &list,
535 void *xmit,
536 int dest,int dox,
537 void *recv,
538 int from,int dor,
539 int xbytes,int rbytes,int dir)
540{
541/*
542 * Bring sequence from Stencil.h down to lower level.
543 * Assume using XeLink is ok
544 */
545 int ncomm =communicator_halo.size();
546 int commdir=dir%ncomm;
547
548 MPI_Request xrq;
549 MPI_Request rrq;
550
551 int ierr;
552 int gdest = ShmRanks[dest];
553 int gfrom = ShmRanks[from];
554 int gme = ShmRanks[_processor];
555
556 assert(dest != _processor);
557 assert(from != _processor);
558 assert(gme == ShmRank);
559 double off_node_bytes=0.0;
560 int tag;
561
562 void * host_recv = NULL;
563 void * host_xmit = NULL;
564
565 /*
566 * PHASE 1: (Prepare)
567 * - post MPI receive buffers asynch
568 * - post device - host send buffer transfer asynch
569 */
570
571 if ( dor ) {
572 if ( (gfrom ==MPI_UNDEFINED) || Stencil_force_mpi ) {
573 tag= dir+from*32;
574 host_recv = this->HostBufferMalloc(rbytes);
575 ierr=MPI_Irecv(host_recv, rbytes, MPI_CHAR,from,tag,communicator_halo[commdir],&rrq);
576 assert(ierr==0);
577 CommsRequest_t srq;
578 srq.PacketType = InterNodeRecv;
579 srq.bytes = rbytes;
580 srq.req = rrq;
581 srq.host_buf = host_recv;
582 srq.device_buf = recv;
583 list.push_back(srq);
584 off_node_bytes+=rbytes;
585 }
586 }
587
588 if (dox) {
589 if ( (gdest == MPI_UNDEFINED) || Stencil_force_mpi ) {
590
591 tag= dir+_processor*32;
592
593 host_xmit = this->HostBufferMalloc(xbytes);
594 CommsRequest_t srq;
595
596 srq.ev = acceleratorCopyFromDeviceAsynch(xmit, host_xmit,xbytes); // Make this Asynch
597
598 // ierr =MPI_Isend(host_xmit, xbytes, MPI_CHAR,dest,tag,communicator_halo[commdir],&xrq);
599 // assert(ierr==0);
600 // off_node_bytes+=xbytes;
601
602 srq.PacketType = InterNodeXmit;
603 srq.bytes = xbytes;
604 // srq.req = xrq;
605 srq.host_buf = host_xmit;
606 srq.device_buf = xmit;
607 srq.tag = tag;
608 srq.dest = dest;
609 srq.commdir = commdir;
610 list.push_back(srq);
611 }
612 }
613
614 return off_node_bytes;
615}
616/*
617 * In the interest of better pipelining, poll for completion on each DtoH and
618 * start MPI_ISend in the meantime
619 */
620void CartesianCommunicator::StencilSendToRecvFromPollIRecv(std::vector<CommsRequest_t> &list)
621{
622 int pending = 0;
623 do {
624
625 pending = 0;
626
627 for(int idx = 0; idx<list.size();idx++){
628
629 if ( list[idx].PacketType==InterNodeRecv ) {
630
631 int flag = 0;
632 MPI_Status status;
633 int ierr = MPI_Test(&list[idx].req,&flag,&status);
634 assert(ierr==0);
635
636 if ( flag ) {
637 // std::cout << " PollIrecv "<<idx<<" flag "<<flag<<std::endl;
638 acceleratorCopyToDeviceAsynch(list[idx].host_buf,list[idx].device_buf,list[idx].bytes);
639 list[idx].PacketType=InterNodeReceiveHtoD;
640 } else {
641 pending ++;
642 }
643 }
644 }
645 // std::cout << " PollIrecv "<<pending<<" pending requests"<<std::endl;
646 } while ( pending );
647
648}
649void CartesianCommunicator::StencilSendToRecvFromPollDtoH(std::vector<CommsRequest_t> &list)
650{
651 int pending = 0;
652 do {
653
654 pending = 0;
655
656 for(int idx = 0; idx<list.size();idx++){
657
658 if ( list[idx].PacketType==InterNodeXmit ) {
659
660 if ( acceleratorEventIsComplete(list[idx].ev) ) {
661
662 void *host_xmit = list[idx].host_buf;
663 uint32_t xbytes = list[idx].bytes;
664 int dest = list[idx].dest;
665 int tag = list[idx].tag;
666 int commdir = list[idx].commdir;
668 // Send packet
670
671 // std::cout << " DtoH is complete for index "<<idx<<" calling MPI_Isend "<<std::endl;
672
673 MPI_Request xrq;
674 int ierr =MPI_Isend(host_xmit, xbytes, MPI_CHAR,dest,tag,communicator_halo[commdir],&xrq);
675 assert(ierr==0);
676
677 list[idx].req = xrq; // Update the MPI request in the list
678
679 list[idx].PacketType=InterNodeXmitISend;
680
681 } else {
682 // not done, so return to polling loop
683 pending++;
684 }
685 }
686 }
687 } while (pending);
688}
689
690double CartesianCommunicator::StencilSendToRecvFromBegin(std::vector<CommsRequest_t> &list,
691 void *xmit,void *xmit_comp,
692 int dest,int dox,
693 void *recv,void *recv_comp,
694 int from,int dor,
695 int xbytes,int rbytes,int dir)
696{
697 int ncomm =communicator_halo.size();
698 int commdir=dir%ncomm;
699
700 MPI_Request xrq;
701 MPI_Request rrq;
702
703 int ierr;
704 int gdest = ShmRanks[dest];
705 int gfrom = ShmRanks[from];
706 int gme = ShmRanks[_processor];
707
708 assert(dest != _processor);
709 assert(from != _processor);
710 assert(gme == ShmRank);
711 double off_node_bytes=0.0;
712 int tag;
713
714 void * host_xmit = NULL;
715
717 // Receives already posted
718 // Copies already started
720 /*
721 * PHASE 2: (Begin)
722 * - complete all copies
723 * - post MPI send asynch
724 */
725#ifdef NVLINK_GET
726 if ( dor ) {
727
728 if ( ! ( (gfrom ==MPI_UNDEFINED) || Stencil_force_mpi ) ) {
729 // Intranode
730 void *shm = (void *) this->ShmBufferTranslate(from,xmit);
731 assert(shm!=NULL);
732
733 CommsRequest_t srq;
734
735 srq.ev = acceleratorCopyDeviceToDeviceAsynch(shm,recv,rbytes);
736
737 srq.PacketType = IntraNodeRecv;
738 srq.bytes = xbytes;
739 // srq.req = xrq;
740 srq.host_buf = NULL;
741 srq.device_buf = xmit;
742 srq.tag = -1;
743 srq.dest = dest;
744 srq.commdir = dir;
745 list.push_back(srq);
746 }
747 }
748#else
749 if (dox) {
750
751 if ( !( (gdest == MPI_UNDEFINED) || Stencil_force_mpi ) ) {
752 // Intranode
753 void *shm = (void *) this->ShmBufferTranslate(dest,recv);
754 assert(shm!=NULL);
755
756 CommsRequest_t srq;
757
758 srq.ev = acceleratorCopyDeviceToDeviceAsynch(xmit,shm,xbytes);
759
760 srq.PacketType = IntraNodeXmit;
761 srq.bytes = xbytes;
762 // srq.req = xrq;
763 srq.host_buf = NULL;
764 srq.device_buf = xmit;
765 srq.tag = -1;
766 srq.dest = dest;
767 srq.commdir = dir;
768 list.push_back(srq);
769
770 }
771 }
772#endif
773 return off_node_bytes;
774}
775void CartesianCommunicator::StencilSendToRecvFromComplete(std::vector<CommsRequest_t> &list,int dir)
776{
777 acceleratorCopySynchronise(); // Complete all pending copy transfers D2D
778
779 std::vector<MPI_Status> status;
780 std::vector<MPI_Request> MpiRequests;
781
782 for(int r=0;r<list.size();r++){
783 // Must check each Send buf is clear to reuse
784 if ( list[r].PacketType == InterNodeXmitISend ) MpiRequests.push_back(list[r].req);
785 // if ( list[r].PacketType == InterNodeRecv ) MpiRequests.push_back(list[r].req); // Already "Test" passed
786 }
787
788 int nreq=MpiRequests.size();
789
790 if (nreq>0) {
791 status.resize(MpiRequests.size());
792 int ierr = MPI_Waitall(MpiRequests.size(),&MpiRequests[0],&status[0]); // Sends are guaranteed in order. No harm in not completing.
793 assert(ierr==0);
794 }
795
796 // for(int r=0;r<nreq;r++){
797 // if ( list[r].PacketType==InterNodeRecv ) {
798 // acceleratorCopyToDeviceAsynch(list[r].host_buf,list[r].device_buf,list[r].bytes);
799 // }
800 // }
801
802
803 list.resize(0); // Delete the list
804 this->HostBufferFreeAll(); // Clean up the buffer allocs
805#ifndef NVLINK_GET
806 this->StencilBarrier(); // if PUT must check our nbrs have filled our receive buffers.
807#endif
808}
809#endif
811// END PIPELINE MODE / NO CUDA AWARE MPI
813
815{
816 FlightRecorder::StepLog("NodeBarrier");
817 MPI_Barrier (ShmComm);
818}
819//void CartesianCommunicator::SendToRecvFromComplete(std::vector<CommsRequest_t> &list)
820//{
821//}
823{
824 FlightRecorder::StepLog("GridBarrier");
825 int ierr = MPI_Barrier(communicator);
826 assert(ierr==0);
827}
828void CartesianCommunicator::Broadcast(int root,void* data, int bytes)
829{
830 FlightRecorder::StepLog("Broadcast");
831 int ierr=MPI_Bcast(data,
832 bytes,
833 MPI_BYTE,
834 root,
836 assert(ierr==0);
837}
839 int r;
840 MPI_Comm_rank(communicator_world,&r);
841 return r;
842}
844 FlightRecorder::StepLog("BarrierWorld");
845 int ierr = MPI_Barrier(communicator_world);
846 assert(ierr==0);
847}
848void CartesianCommunicator::BroadcastWorld(int root,void* data, int bytes)
849{
850 FlightRecorder::StepLog("BroadcastWorld");
851 int ierr= MPI_Bcast(data,
852 bytes,
853 MPI_BYTE,
854 root,
856 assert(ierr==0);
857}
858
859void CartesianCommunicator::AllToAll(int dim,void *in,void *out,uint64_t words,uint64_t bytes)
860{
861 Coordinate row(_ndimension,1);
862 assert(dim>=0 && dim<_ndimension);
863
864 // Split the communicator
865 row[dim] = _processors[dim];
866
867 int me;
868 CartesianCommunicator Comm(row,*this,me);
869 Comm.AllToAll(in,out,words,bytes);
870}
871void CartesianCommunicator::AllToAll(void *in,void *out,uint64_t words,uint64_t bytes)
872{
873 FlightRecorder::StepLog("AllToAll");
874 // MPI is a pain and uses "int" arguments
875 // 64*64*64*128*16 == 500Million elements of data.
876 // When 24*4 bytes multiples get 50x 10^9 >>> 2x10^9 Y2K bug.
877 // (Turns up on 32^3 x 64 Gparity too)
878 MPI_Datatype object;
879 int iwords;
880 int ibytes;
881 iwords = words;
882 ibytes = bytes;
883 assert(words == iwords); // safe to cast to int ?
884 assert(bytes == ibytes); // safe to cast to int ?
885 MPI_Type_contiguous(ibytes,MPI_BYTE,&object);
886 MPI_Type_commit(&object);
887 MPI_Alltoall(in,iwords,object,out,iwords,object,communicator);
888 MPI_Type_free(&object);
889}
890
void acceleratorCopySynchronise(void)
int acceleratorIsCommunicable(void *ptr)
int acceleratorEventIsComplete(acceleratorEvent_t ev)
acceleratorEvent_t acceleratorCopyDeviceToDeviceAsynch(void *from, void *to, size_t bytes)
acceleratorEvent_t acceleratorCopyFromDeviceAsynch(void *from, void *to, size_t bytes)
acceleratorEvent_t acceleratorCopyToDeviceAsynch(void *from, void *to, size_t bytes)
bool Stencil_force_mpi
bool Stencil_force_mpi
AcceleratorVector< int, MaxDims > Coordinate
Definition Coordinate.h:95
void Grid_quiesce_nodes(void)
Definition Log.cc:109
void Grid_unquiesce_nodes(void)
Definition Log.cc:122
#define NAMESPACE_BEGIN(A)
Definition Namespace.h:35
#define NAMESPACE_END(A)
Definition Namespace.h:36
int Grid_MPI_Comm
int CommsRequest_t
accelerator_inline void resize(size_type sz)
Definition Coordinate.h:54
accelerator_inline size_type size(void) const
Definition Coordinate.h:52
double StencilSendToRecvFromPrepare(std::vector< CommsRequest_t > &list, void *xmit, int xmit_to_rank, int do_xmit, void *recv, int recv_from_rank, int do_recv, int xbytes, int rbytes, int dir)
void ProcessorCoorFromRank(int rank, Coordinate &coor)
CartesianCommunicator(const Coordinate &processors, const CartesianCommunicator &parent, int &srank)
void StencilSendToRecvFromComplete(std::vector< CommsRequest_t > &waitall, int i)
void Broadcast(int root, void *data, int bytes)
void StencilSendToRecvFromPollDtoH(std::vector< CommsRequest_t > &list)
static void Init(int *argc, char ***argv)
double StencilSendToRecvFrom(void *xmit, int xmit_to_rank, int do_xmit, void *recv, int recv_from_rank, int do_recv, int bytes, int dir)
void AllToAll(int dim, std::vector< T > &in, std::vector< T > &out)
void StencilSendToRecvFromPollIRecv(std::vector< CommsRequest_t > &list)
double StencilSendToRecvFromBegin(std::vector< CommsRequest_t > &list, void *xmit, void *xmit_comp, int xmit_to_rank, int do_xmit, void *recv, void *recv_comp, int recv_from_rank, int do_recv, int xbytes, int rbytes, int dir)
static void BarrierWorld(void)
void CommsComplete(std::vector< MpiCommsRequest_t > &list)
void GlobalSumVector(RealF *, int N)
void SendToRecvFrom(void *xmit, int xmit_to_rank, void *recv, int recv_from_rank, int bytes)
int RankFromProcessorCoor(Coordinate &coor)
static Grid_MPI_Comm communicator_world
static void BroadcastWorld(int root, void *data, int bytes)
void InitFromMPICommunicator(const Coordinate &processors, Grid_MPI_Comm communicator_base)
void SendToRecvFromBegin(std::vector< MpiCommsRequest_t > &list, void *xmit, int dest, void *recv, int from, int bytes, int dir)
void ShiftedRanks(int dim, int shift, int &source, int &dest)
std::vector< Grid_MPI_Comm > communicator_halo
static bool StepLog(const char *name)
static void SharedMemoryAllocate(uint64_t bytes, int flags)
static void Init(Grid_MPI_Comm comm)
static uint64_t MAX_MPI_SHM_BYTES
static void OptimalCommunicator(const Coordinate &processors, Grid_MPI_Comm &optimal_comm, Coordinate &ShmDims)
void * ShmBufferTranslate(int rank, void *local_p)
std::vector< int > ShmRanks
void SetCommunicator(Grid_MPI_Comm comm)
Grid_MPI_Comm ShmComm