00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #ifndef _OBJECTCOMMUNICATOR_HPP_
00037 #define _OBJECTCOMMUNICATOR_HPP_
00038
00039
00040 #include <boost/archive/binary_oarchive.hpp>
00041 #include <boost/archive/binary_iarchive.hpp>
00042
00043 #include <boost/scoped_array.hpp>
00044 #include <boost/shared_ptr.hpp>
00045 #include <boost/serialization/shared_ptr.hpp>
00046 #include "PetscTools.hpp"
00047 #include "Exception.hpp"
00048
00049 const unsigned MAX_BUFFER_SIZE = 1000000;
00050
00056 template<typename CLASS>
00057 class ObjectCommunicator
00058 {
00059 private:
00060
00062 char* mRecvBuffer;
00063
00067 std::vector<char* > mSendBuffer;
00068
00072 std::vector<std::string> mSendString;
00073
00075 unsigned mRecvBufferLength;
00076
00078 unsigned mSendBufferLength;
00079
00081 MPI_Request mMpiRequest;
00082
00084 bool mIsWriting;
00085
00086 public:
00087
00091 ObjectCommunicator();
00092
00100 void SendObject(boost::shared_ptr<CLASS> const pObject, unsigned destinationProcess, unsigned tag);
00101
00109 void ISendObject(boost::shared_ptr<CLASS> const pObject, unsigned destinationProcess, unsigned tag);
00110
00120 boost::shared_ptr<CLASS> RecvObject(unsigned sourceProcess, unsigned tag, MPI_Status& status);
00121
00128 void IRecvObject(unsigned sourceProcess, unsigned tag);
00129
00135 boost::shared_ptr<CLASS> GetRecvObject();
00136
00149 boost::shared_ptr<CLASS> SendRecvObject(boost::shared_ptr<CLASS> const pSendObject, unsigned destinationProcess, unsigned sendTag, unsigned sourceProcess, unsigned sourceTag, MPI_Status& status);
00150 };
00151
00152 #include <sstream>
00153 #include <string>
00154 #include <cstring>
00155
00156
00157 template<typename CLASS>
00158 ObjectCommunicator<CLASS>::ObjectCommunicator()
00159 : mIsWriting(false)
00160 {
00161 mSendBuffer.resize(PetscTools::GetNumProcs());
00162 mSendString.resize(PetscTools::GetNumProcs());
00163 }
00164
00165 template<typename CLASS>
00166 void ObjectCommunicator<CLASS>::SendObject(boost::shared_ptr<CLASS> const pObject, unsigned destinationProcess, unsigned tag)
00167 {
00168
00169 std::ostringstream ss(std::ios::binary);
00170 boost::archive::binary_oarchive output_arch(ss);
00171
00172 output_arch << pObject;
00173
00174 const std::string send_msg = ss.str();
00175
00176
00177 unsigned string_length = send_msg.size();
00178 MPI_Send(&string_length, 1, MPI_UNSIGNED, destinationProcess, tag, PetscTools::GetWorld());
00179
00180
00181
00182 char* send_buf = const_cast<char*>(send_msg.data());
00183 MPI_Send(send_buf, string_length, MPI_BYTE, destinationProcess, tag, PetscTools::GetWorld());
00184 }
00185
00186 template<typename CLASS>
00187 void ObjectCommunicator<CLASS>::ISendObject(boost::shared_ptr<CLASS> const pObject, unsigned destinationProcess, unsigned tag)
00188 {
00189 MPI_Request request;
00190
00191
00192 std::ostringstream ss(std::ios::binary);
00193 boost::archive::binary_oarchive output_arch(ss);
00194
00195 output_arch << pObject;
00196
00197 mSendString[destinationProcess] = ss.str();
00198 mSendBufferLength = mSendString[destinationProcess].size();
00199
00200
00201 assert(mSendBufferLength < MAX_BUFFER_SIZE);
00202
00203
00204
00205 mSendBuffer[destinationProcess] = const_cast<char*>(mSendString[destinationProcess].data());
00206 MPI_Isend(mSendBuffer[destinationProcess], mSendBufferLength, MPI_BYTE, destinationProcess, tag, PetscTools::GetWorld(), &request);
00207 MPI_Request_free(&request);
00208 }
00209
00210 template<typename CLASS>
00211 boost::shared_ptr<CLASS> ObjectCommunicator<CLASS>::RecvObject(unsigned sourceProcess, unsigned tag, MPI_Status& status)
00212 {
00213 unsigned string_length = 0;
00214 MPI_Recv(&string_length, 1, MPI_UNSIGNED, sourceProcess, tag, PetscTools::GetWorld(), &status);
00215
00216 char* recv_array = new char[string_length];
00217 MPI_Recv(recv_array, string_length, MPI_BYTE, sourceProcess , tag, PetscTools::GetWorld(), &status);
00218
00219
00220 std::string recv_string(recv_array, string_length);
00221 delete[] recv_array;
00222 std::istringstream ss(recv_string, std::ios::binary);
00223
00224 boost::shared_ptr<CLASS> p_recv_object(new CLASS);
00225 boost::archive::binary_iarchive input_arch(ss);
00226
00227 input_arch >> p_recv_object;
00228
00229 return p_recv_object;
00230 }
00231
00232 template<typename CLASS>
00233 void ObjectCommunicator<CLASS>::IRecvObject(unsigned sourceProcess, unsigned tag)
00234 {
00235 assert(!mIsWriting);
00236
00237 mIsWriting = true;
00238
00239 mRecvBuffer = new char[MAX_BUFFER_SIZE];
00240 MPI_Irecv(mRecvBuffer, MAX_BUFFER_SIZE, MPI_BYTE, sourceProcess, tag, PetscTools::GetWorld(), &mMpiRequest);
00241 }
00242
00243 template<typename CLASS>
00244 boost::shared_ptr<CLASS> ObjectCommunicator<CLASS>::GetRecvObject()
00245 {
00246 if (!mIsWriting)
00247 {
00248 EXCEPTION("No object to receive in ObjectCommunicator::GetRecvObject");
00249 }
00250
00251 MPI_Status return_status;
00252
00253 MPI_Wait(&mMpiRequest, &return_status);
00254
00255 int recv_size;
00256 MPI_Get_count(&return_status, MPI_BYTE, &recv_size);
00257
00258
00259 std::string recv_string(mRecvBuffer, recv_size);
00260 std::istringstream ss(recv_string, std::ios::binary);
00261
00262 boost::shared_ptr<CLASS> p_recv_object(new CLASS);
00263 boost::archive::binary_iarchive input_arch(ss);
00264
00265 input_arch >> p_recv_object;
00266
00267
00268 delete[] mRecvBuffer;
00269
00270 mIsWriting = false;
00271
00272 return p_recv_object;
00273 }
00274
00275 template<typename CLASS>
00276 boost::shared_ptr<CLASS> ObjectCommunicator<CLASS>::SendRecvObject(boost::shared_ptr<CLASS> const pSendObject, unsigned destinationProcess, unsigned sendTag, unsigned sourceProcess, unsigned sourceTag, MPI_Status& status)
00277 {
00278
00279 std::ostringstream oss(std::ios::binary);
00280 boost::archive::binary_oarchive output_arch(oss);
00281
00282 output_arch << pSendObject;
00283
00284 std::string send_msg = oss.str();
00285
00286
00287 unsigned send_string_length = send_msg.size();
00288 unsigned recv_string_length;
00289
00290 MPI_Sendrecv(&send_string_length, 1, MPI_UNSIGNED, destinationProcess, sendTag, &recv_string_length, 1, MPI_UNSIGNED, sourceProcess, sourceTag, PetscTools::GetWorld(), &status);
00291
00292 boost::scoped_array<char> recv_array(new char[recv_string_length]);
00293
00294
00295 char* send_buf = const_cast<char*>(send_msg.data());
00296 MPI_Sendrecv(send_buf, send_string_length, MPI_BYTE, destinationProcess, sendTag, recv_array.get(), recv_string_length, MPI_BYTE, sourceProcess, sourceTag, PetscTools::GetWorld(), &status);
00297
00298
00299 std::string recv_string(recv_array.get(), recv_string_length);
00300 std::istringstream iss(recv_string, std::ios::binary);
00301
00302 boost::shared_ptr<CLASS> p_recv_object(new CLASS);
00303 boost::archive::binary_iarchive input_arch(iss);
00304
00305 input_arch >> p_recv_object;
00306
00307 return p_recv_object;
00308 }
00309
00310 #endif // _OBJECTCOMMUNICATOR_HPP_