CFD Online Discussion Forums

CFD Online Discussion Forums (https://www.cfd-online.com/Forums/)
-   OpenFOAM Programming & Development (https://www.cfd-online.com/Forums/openfoam-programming-development/)
-   -   How does the non-blocking communication overlap computation with communication? (https://www.cfd-online.com/Forums/openfoam-programming-development/164505-how-does-non-blocking-communication-overlap-computation-communication.html)

zhulianhua December 23, 2015 10:59

How does the non-blocking communication overlap computation with communication?
 
I'm investigating the parallel mechanism of OpenFOAM.
I found the following code snippets that doing the message passing job.

Code:

// Correct the boundary conditions
template<class Type, template<class> class PatchField, class GeoMesh>
void Foam::GeometricField<Type, PatchField, GeoMesh>::
correctBoundaryConditions()
{
    this->setUpToDate();
    storeOldTimes();
    boundaryField_.evaluate();
}

template<class Type, template<class> class PatchField, class GeoMesh>
void Foam::GeometricField<Type, PatchField, GeoMesh>::GeometricBoundaryField::
evaluate()
{
    if (debug)
    {
        Info<< "GeometricField<Type, PatchField, GeoMesh>::"
              "GeometricBoundaryField::"
              "evaluate()" << endl;
    }

    if
    (
        Pstream::defaultCommsType == Pstream::blocking
    || Pstream::defaultCommsType == Pstream::nonBlocking
    )
    {
        label nReq = Pstream::nRequests();

        forAll(*this, patchi)//Process each boundary patch
        {
            this->operator[](patchi).initEvaluate(Pstream::defaultCommsType);
        }

        // Block for any outstanding requests
        if
        (
            Pstream::parRun()
        && Pstream::defaultCommsType == Pstream::nonBlocking
        )
        {
            Pstream::waitRequests(nReq);
        }

        forAll(*this, patchi)
        {
            this->operator[](patchi).evaluate(Pstream::defaultCommsType);
        }
    }
    else if (Pstream::defaultCommsType == Pstream::scheduled)
    {
        const lduSchedule& patchSchedule =
            bmesh_.mesh().globalData().patchSchedule();

        forAll(patchSchedule, patchEvali)
        {
            if (patchSchedule[patchEvali].init)
            {
                this->operator[](patchSchedule[patchEvali].patch)
                    .initEvaluate(Pstream::scheduled);
            }
            else
            {
                this->operator[](patchSchedule[patchEvali].patch)
                    .evaluate(Pstream::scheduled);
            }
        }
    }
    else
    {
        FatalErrorIn("GeometricBoundaryField::evaluate()")
            << "Unsuported communications type "
            << Pstream::commsTypeNames[Pstream::defaultCommsType]
            << exit(FatalError);
    }
}

template<class Type>
void Foam::processorFvPatchField<Type>::initEvaluate
(
    const Pstream::commsTypes commsType
)
{
    if (Pstream::parRun())
    {
        this->patchInternalField(sendBuf_);

        if (commsType == Pstream::nonBlocking && !Pstream::floatTransfer)//?
        {
            // Fast path. Receive into *this
            this->setSize(sendBuf_.size());
            outstandingRecvRequest_ = UPstream::nRequests();
            UIPstream::read
            (
                Pstream::nonBlocking,
                procPatch_.neighbProcNo(),
                reinterpret_cast<char*>(this->begin()),
                this->byteSize(),
                procPatch_.tag(),
                procPatch_.comm()
            );

            outstandingSendRequest_ = UPstream::nRequests();
            UOPstream::write
            (
                Pstream::nonBlocking,
                procPatch_.neighbProcNo(),
                reinterpret_cast<const char*>(sendBuf_.begin()),
                this->byteSize(),
                procPatch_.tag(),
                procPatch_.comm()
            );
        }
        else
        {
            procPatch_.compressedSend(commsType, sendBuf_);
        }
    }
}


template<class Type>
void Foam::processorFvPatchField<Type>::evaluate
(
    const Pstream::commsTypes commsType
)
{
    if (Pstream::parRun())
    {
        if (commsType == Pstream::nonBlocking && !Pstream::floatTransfer)
        {
            // Fast path. Received into *this

            if
            (
                outstandingRecvRequest_ >= 0
            && outstandingRecvRequest_ < Pstream::nRequests()
            )
            {
                UPstream::waitRequest(outstandingRecvRequest_);
            }
            outstandingSendRequest_ = -1;
            outstandingRecvRequest_ = -1;
        }
        else
        {
            procPatch_.compressedReceive<Type>(commsType, *this);//直接接收
        }

        if (doTransform())
        {
            transform(*this, procPatch_.forwardT(), *this);
        }
    }
}

We can see, if using non-blocking communication, the Pstream::waitRequests() is executed between this->operator[](patchi).initEvaluate() and this->operator[](patchi).evaluate(), nothing else!
For non-blocking communication, processorFvPatchField::initEvaluate() prepares to send/recv, and processorFvPatchField::evaluate() checks the commnication is done.

I thought the bulk internal field operations should been put between the processorFvPatchField::initEvaluate() and the processorFvPatchField::evaluate() to "overlap the computation with commnication" ? What's wrong with my interpretation of the OpenFOAM codes?

Thanks.

Lianhua

qjh888 October 19, 2016 21:00

Hi There,

I've just going deep to this piece of code.
And currently, my code dosen't work for parallel.
After adding some debug flags, I'm coming to this piece of code.

Do you have some progress of that?
Would you mind to share your experience?
Thanks!
Janry


All times are GMT -4. The time now is 13:59.