OOFEM 3.0
Loading...
Searching...
No Matches
dyncombuff.C
Go to the documentation of this file.
1/*
2 *
3 * ##### ##### ###### ###### ### ###
4 * ## ## ## ## ## ## ## ### ##
5 * ## ## ## ## #### #### ## # ##
6 * ## ## ## ## ## ## ## ##
7 * ## ## ## ## ## ## ## ##
8 * ##### ##### ## ###### ## ##
9 *
10 *
11 * OOFEM : Object Oriented Finite Element Code
12 *
13 * Copyright (C) 1993 - 2025 Borek Patzak
14 *
15 *
16 *
17 * Czech Technical University, Faculty of Civil Engineering,
18 * Department of Structural Mechanics, 166 29 Prague, Czech Republic
19 *
20 * This library is free software; you can redistribute it and/or
21 * modify it under the terms of the GNU Lesser General Public
22 * License as published by the Free Software Foundation; either
23 * version 2.1 of the License, or (at your option) any later version.
24 *
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
28 * Lesser General Public License for more details.
29 *
30 * You should have received a copy of the GNU Lesser General Public
31 * License along with this library; if not, write to the Free Software
32 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
33 */
34
35#include <list>
36#include <algorithm>
37
38#include "dyncombuff.h"
39#include "mathfem.h"
40#include "error.h"
41
42namespace oofem {
43 CommunicationPacket :: CommunicationPacket(MPI_Comm comm, std::size_t size, int num) : MPIBuffer(max(size, __CommunicationPacket_DEFAULT_SIZE), false)
44{
45 this->EOF_Flag = false;
46 this->number = num;
47 // reserve space for packet header
48 curr_pos += givePackSize(comm, MPI_INT, 2);
49}
50
51
52CommunicationPacket :: CommunicationPacket(MPI_Comm comm, int num) : MPIBuffer(__CommunicationPacket_DEFAULT_SIZE, false)
53{
54 this->EOF_Flag = false;
55 this->number = num;
56 // reserve space for packet header
57 curr_pos += givePackSize(comm, MPI_INT, 2);
58}
59
60
61CommunicationPacket :: ~CommunicationPacket()
62{ }
63
64
65void
66CommunicationPacket :: init(MPI_Comm comm)
67{
68 MPIBuffer :: init();
69 this->EOF_Flag = false;
70 // reserve space for packet header
71 curr_pos += givePackSize(comm, MPI_INT, 2);
72}
73
74int
75CommunicationPacket :: iSend(MPI_Comm communicator, int dest, int tag)
76{
77 this->packHeader(communicator);
78 return ( MPI_Isend(this->buff, this->curr_pos, MPI_PACKED, dest, tag,
79 communicator, & this->request) == MPI_SUCCESS );
80}
81
82
83int
84CommunicationPacket :: iRecv(MPI_Comm communicator, int source, int tag, std::size_t count)
85{
86 if ( count ) {
87 if ( count >= this->size ) {
88 // reallocate itself
89 if ( this->resize(count) == 0 ) {
90 return 0;
91 }
92 }
93 }
94
95 return ( MPI_Irecv(this->buff, this->size, MPI_PACKED, source, tag,
96 communicator, & this->request) == MPI_SUCCESS );
97}
98
99
100int
101CommunicationPacket :: testCompletion() {
102 int flag;
103 MPI_Status status;
104
105 MPI_Test(& this->request, & flag, & status);
106 return flag;
107}
108
109int
110CommunicationPacket :: waitCompletion()
111{
112 MPI_Status status;
113
114 return ( MPI_Wait(& this->request, & status) == MPI_SUCCESS );
115}
116
117
118int
119CommunicationPacket :: packHeader(MPI_Comm comm)
120{
121 int _arry [ 2 ];
122 int _res, _pos = 0;
123
124 _arry [ 0 ] = this->number;
125 _arry [ 1 ] = this->EOF_Flag;
126
127 _res = MPI_Pack(_arry, 2, MPI_INT, this->buff, size, & _pos, comm);
128
129 return ( _res == MPI_SUCCESS );
130}
131
132int
133CommunicationPacket :: unpackHeader(MPI_Comm comm)
134{
135 int _arry [ 2 ];
136 int _res, _pos = 0;
137
138 _res = MPI_Unpack(this->buff, this->size, & _pos, _arry, 2, MPI_INT, comm);
139 this->number = _arry [ 0 ];
140 this->EOF_Flag = _arry [ 1 ];
141
142 return ( _res == MPI_SUCCESS );
143}
144
145
146
147/********DynamicCommunicationBuffer***************/
148
149CommunicationPacketPool DynamicCommunicationBuffer :: packetPool;
150
151 DynamicCommunicationBuffer :: DynamicCommunicationBuffer(MPI_Comm comm, std::size_t size, bool dynamic) :
152 CommunicationBuffer(comm, size, dynamic), packet_list()
153{
155 mode = DCB_null;
156 completed = false;
157 /*
158 * // alocate first send/receive packet
159 * active_packet = this->allocateNewPacket (++number_of_packets);
160 * packet_list.push_back(active_packet);
161 */
162}
163
164
165DynamicCommunicationBuffer :: DynamicCommunicationBuffer(MPI_Comm comm, bool dynamic) :
166 CommunicationBuffer(comm, dynamic), packet_list()
167{
169 mode = DCB_null;
170 completed = false;
171 /*
172 * // alocate first send/receive packet
173 * active_packet = this->allocateNewPacket (++number_of_packets);
174 * packet_list.push_back(active_packet);
175 */
176}
177
179DynamicCommunicationBuffer :: ~DynamicCommunicationBuffer()
180{
181 this->clear();
182}
183
184void
185DynamicCommunicationBuffer :: init()
186{
187 completed = false;
188 this->clear();
189}
190
191void
192DynamicCommunicationBuffer :: initForPacking()
193{
194 this->clear();
195
196 if ( !active_packet ) {
198 packet_list.push_back(active_packet);
199 }
200}
201
202void
203DynamicCommunicationBuffer :: initForUnpacking()
204{
205 recvIt = packet_list.begin();
206 this->popNewRecvPacket();
207}
208
209/*
210 * int
211 * DynamicCommunicationBuffer::write (int* src, int n)
212 * {
213 * int _result=1;
214 * int start_indx=0, end_indx, _size;
215 *
216 * do {
217 * _size = this->giveFitSize(MPI_INT, active_packet -> giveAvailableSpace(), n);
218 * end_indx = start_indx + _size;
219 *
220 * if (_size) _result &= active_packet -> write (communicator, src+start_indx,_size, MPI_INT);
221 * if (end_indx == n) break;
222 * // active packet full, allocate a new one
223 * active_packet = this->allocateNewPacket (++number_of_packets);
224 * packet_list.push_back(active_packet);
225 * start_indx = end_indx;
226 * } while (1);
227 *
228 * return _result;
229 * }
230 *
231 *
232 * int
233 * DynamicCommunicationBuffer::read (int* dest, int n)
234 * {
235 * int _result=1;
236 * int start_indx=0, end_indx, _size;
237 *
238 * do {
239 * _size = this->giveFitSize(MPI_INT, active_packet -> giveAvailableSpace(), n);
240 * end_indx = start_indx + _size;
241 *
242 * if (_size) _result &= active_packet->read (communicator,dest+start_indx,_size, MPI_INT);
243 * if (end_indx == n) break;
244 * // active packet exhausted, pop a new one
245 * this->popNewRecvPacket();
246 * start_indx = end_indx;
247 * } while (1);
248 *
249 * return _result;
250 * }
251 */
252
253int
254DynamicCommunicationBuffer :: iSend(int dest, int tag)
255{
256 int result = 1;
257
259 active_packet->setEOFFlag();
260
261 active_rank = dest;
262 active_tag = tag;
263 for ( auto &packet: packet_list ) {
264 result &= packet->iSend(communicator, dest, tag);
265 }
266
267 /*
268 * int _myrank;
269 * MPI_Comm_rank (communicator, &_myrank);
270 * fprintf (stderr,"[%d] sending to [%d] %d packets for tag %d\n", _myrank, dest, number_of_packets, tag);
271 * (*(packet_list.begin()))->dump();
272 */
273 mode = DCB_send;
274 completed = false;
275 return result;
276}
277
278
279
280int
281DynamicCommunicationBuffer :: iRecv(int source, int tag, std::size_t count)
282{
283 this->init();
285 // receive first packet, but it is probably not the last one
286 // create new first packet and init its receive
288 active_rank = source;
289 active_tag = tag;
291 completed = false;
292 return active_packet->iRecv(communicator, source, tag);
293}
294
295int DynamicCommunicationBuffer :: receiveCompleted()
296{
297 /*
298 * int _myrank;
299 * MPI_Comm_rank (communicator, &_myrank);
300 */
301 if ( completed ) {
302 return 1;
303 }
304
305 if ( active_packet->testCompletion() ) {
306 // active packet received, add it to the pool and unpach header info
307 active_packet->unpackHeader(communicator);
308
309 //fprintf (stderr, "[%d] received from [%d] packet no. %d\n", _myrank, active_rank, active_packet->getNumber());
310
312 if ( active_packet->hasEOFFlag() ) {
313 // last packet received; init for unpacking
314 this->initForUnpacking();
315
316 //fprintf (stderr,"[%d] received from [%d] %d packets for tag %d\n", _myrank, active_rank, number_of_packets, active_tag);
317 //active_packet->dump();
318 completed = true;
319 return 1;
320 } else {
321 // received next packet, but it is not the last one
322 // create new packet and init its receive
325 return 0;
326 }
327 } else {
328 // active packet not yet received
329 return 0;
330 }
331}
332
333int DynamicCommunicationBuffer :: sendCompleted()
334{
335 int result = 1;
336
337 if ( completed ) {
338 return 1;
339 }
340
341 for ( auto &packet: packet_list ) {
342 result &= packet->testCompletion();
343 }
344
345 completed = result;
346 return result;
347}
348
349int
350DynamicCommunicationBuffer :: testCompletion()
351{
352 if ( mode == DCB_send ) {
353 return this->sendCompleted();
354 } else if ( mode == DCB_receive ) {
355 return this->receiveCompleted();
356 } else {
357 return 0;
358 }
359}
360
361int
362DynamicCommunicationBuffer :: waitCompletion()
363{
364 if ( mode == DCB_send ) {
365 while ( !this->sendCompleted() ) { }
366
367 ;
368 return 1;
369 } else if ( mode == DCB_receive ) {
370 while ( !this->receiveCompleted() ) { }
371
372 ;
373 return 1;
374 }
375
376 return 0;
377}
378
379
380int
381DynamicCommunicationBuffer :: giveFitSize(MPI_Datatype type, int availableSpace, int arrySize)
382{
383 int arrySpace, guessSize;
384 MPI_Pack_size(arrySize, type, communicator, & arrySpace);
385 if ( availableSpace >= arrySpace ) {
386 return arrySize;
387 }
388
389 guessSize = ( int ) floor( ( ( double ) arrySize / ( double ) arrySpace ) * availableSpace ) + 1;
390 do {
391 guessSize--;
392 MPI_Pack_size(guessSize, type, communicator, & arrySpace);
393 } while ( ( availableSpace < arrySpace ) && ( guessSize > 0 ) );
394
395 return guessSize;
396}
397
399DynamicCommunicationBuffer :: allocateNewPacket(int n)
400{
401 CommunicationPacket *result = packetPool.popPacket(communicator);
402 result->init(communicator);
403 result->setNumber(n);
404 return result;
405}
406
407void
408DynamicCommunicationBuffer :: freePacket(CommunicationPacket *p)
409{
410 packetPool.pushPacket(p);
411}
412
413void
414DynamicCommunicationBuffer :: clear()
415{
416 for ( auto &packet: packet_list ) {
417 this->freePacket(packet);
418 }
419
420 packet_list.clear();
421 active_packet = NULL;
423}
424
425
426void
427DynamicCommunicationBuffer :: popNewRecvPacket()
428{
429 active_packet = ( * recvIt );
430 ++recvIt;
431 if ( active_packet == NULL ) {
432 OOFEM_ERROR("no more packets received");
433 }
434
435 //active_packet->init(communicator);
436}
437
438void
439DynamicCommunicationBuffer :: pushNewRecvPacket(CommunicationPacket *p)
440{
441 packet_list.push_back(p);
442}
443
444
445int
446DynamicCommunicationBuffer :: bcast(int root)
447{
448 OOFEM_ERROR("not implemented");
449 return 0;
450}
451
453
455CommunicationPacketPool :: popPacket(MPI_Comm comm)
456{
457 CommunicationPacket *result;
458
459 if ( available_packets.empty() ) {
460 // allocate new packet
461 if ( ( result = new CommunicationPacket(comm, 0) ) == NULL ) {
462 OOFEM_ERROR("allocation of new packed failed");
463 }
464
466 } else {
467 result = available_packets.front();
468 available_packets.pop_front();
469 freePackets--;
470 }
471
472#ifdef DEBUG
473 // add packet into list of leased packets
474 leased_packets.push_back(result);
475#endif
476
478 return result;
479}
480
481void
482CommunicationPacketPool :: pushPacket(CommunicationPacket *p)
483{
484#ifdef DEBUG
485 std :: list< CommunicationPacket * > :: iterator it = std :: find(leased_packets.begin(), leased_packets.end(), p);
486 if ( it != leased_packets.end() ) {
487 // found previosly leased one
488 leased_packets.erase(it);
489 available_packets.push_back(p);
490 } else {
491 OOFEM_ERROR("request to push strange packet (not allocated by pool)");
492 }
493
494#else
495 available_packets.push_back(p);
496#endif
497
499 freePackets++;
500}
501
502void
503CommunicationPacketPool :: clear()
504{
505 if ( !leased_packets.empty() ) {
506 OOFEM_WARNING("some packets still leased");
507 }
508
509 for ( auto &packet: available_packets ) {
510 if ( packet ) {
511 delete packet;
512 }
513 }
514
515 available_packets.clear();
517}
518
519
520void
521CommunicationPacketPool :: printInfo()
522{
523 OOFEM_LOG_INFO("CommunicationPacketPool: allocated %d packets\n(packet size: %d, %d leased, %d free)\n",
525}
526} // end namespace oofem
CommunicationBuffer(MPI_Comm comm, std::size_t size, bool dynamic=0)
Definition combuff.h:232
std ::list< CommunicationPacket * > available_packets
Definition dyncombuff.h:136
std ::list< CommunicationPacket * > leased_packets
Definition dyncombuff.h:137
int packHeader(MPI_Comm)
Packs packet header info at receiver beginning.
Definition dyncombuff.C:119
virtual void init(MPI_Comm comm)
Definition dyncombuff.C:66
enum oofem::DynamicCommunicationBuffer::DCB_Mode mode
std::list< CommunicationPacket * >::iterator recvIt
Iterator to iterate over received packets.
Definition dyncombuff.h:163
int active_tag
Active rank and tag (send by initSend,initReceive, and initExchange).
Definition dyncombuff.h:167
CommunicationPacket * active_packet
Active packet.
Definition dyncombuff.h:165
void initForUnpacking() override
Initialize for Unpacking (data already received).
Definition dyncombuff.C:203
void freePacket(CommunicationPacket *)
Definition dyncombuff.C:408
static CommunicationPacketPool packetPool
Static packet pool.
Definition dyncombuff.h:173
void pushNewRecvPacket(CommunicationPacket *)
Definition dyncombuff.C:439
bool completed
Communication completion flag.
Definition dyncombuff.h:175
std ::list< CommunicationPacket * > packet_list
Definition dyncombuff.h:161
CommunicationPacket * allocateNewPacket(int)
Definition dyncombuff.C:399
ComBuff_BYTE_TYPE * buff
Buffer. Dynamically allocated.
Definition combuff.h:78
std::size_t size
Size and current position in buffer in bytes (sizeof(char)).
Definition combuff.h:74
MPIBuffer(std::size_t size, bool dynamic=0)
Constructor. Creates buffer of given size, using given communicator for packing.
Definition combuff.C:46
MPI_Request request
Definition combuff.h:84
std::size_t curr_pos
Definition combuff.h:74
int givePackSize(MPI_Comm communicator, MPI_Datatype type, std::size_t size)
Definition combuff.C:226
int resize(std::size_t newSize)
Definition combuff.C:78
#define __CommunicationPacket_DEFAULT_SIZE
Definition dyncombuff.h:47
#define OOFEM_WARNING(...)
Definition error.h:80
#define OOFEM_ERROR(...)
Definition error.h:79
#define OOFEM_LOG_INFO(...)
Definition logger.h:143
FloatArrayF< N > max(const FloatArrayF< N > &a, const FloatArrayF< N > &b)

This page is part of the OOFEM-3.0 documentation. Copyright Copyright (C) 1994-2025 Borek Patzak Bořek Patzák
Project e-mail: oofem@fsv.cvut.cz
Generated at for OOFEM by doxygen 1.15.0 written by Dimitri van Heesch, © 1997-2011