/* Copyright (c) 2003-2006 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */ #include #include #include #include #include #include #include "LongSignal.hpp" #include #include #include #include #include "VMSignal.hpp" #include #include "DataBuffer.hpp" /** * The instance */ SectionSegmentPool g_sectionSegmentPool; struct ConnectionError { enum TransporterError err; const char *text; }; static const ConnectionError connectionError[] = { { TE_NO_ERROR, "No error"}, { TE_SHM_UNABLE_TO_CREATE_SEGMENT, "Unable to create shared memory segment"}, { (enum TransporterError) -1, "No connection error message available (please report a bug)"} }; const char *lookupConnectionError(Uint32 err) { int i= 0; while ((Uint32)connectionError[i].err != err && connectionError[i].err != -1) i++; return connectionError[i].text; } bool import(Ptr & first, const Uint32 * src, Uint32 len){ /** * Dummy data used when setting prev.m_nextSegment for first segment of a * section */ Uint32 dummyPrev[4]; first.p = 0; if(g_sectionSegmentPool.seize(first)){ ; } else { return false; } first.p->m_sz = len; first.p->m_ownerRef = 0; Ptr prevPtr = { (SectionSegment *)&dummyPrev[0], 0 }; Ptr currPtr = first; while(len > SectionSegment::DataLength){ prevPtr.p->m_nextSegment = currPtr.i; memcpy(&currPtr.p->theData[0], src, 4 * SectionSegment::DataLength); src += SectionSegment::DataLength; len -= SectionSegment::DataLength; prevPtr = currPtr; if(g_sectionSegmentPool.seize(currPtr)){ ; } else { first.p->m_lastSegment = prevPtr.i; return false; } } first.p->m_lastSegment = currPtr.i; currPtr.p->m_nextSegment = RNIL; memcpy(&currPtr.p->theData[0], src, 4 * len); return true; } void linkSegments(Uint32 head, Uint32 tail){ Ptr headPtr; g_sectionSegmentPool.getPtr(headPtr, head); Ptr tailPtr; g_sectionSegmentPool.getPtr(tailPtr, tail); Ptr oldTailPtr; g_sectionSegmentPool.getPtr(oldTailPtr, headPtr.p->m_lastSegment); headPtr.p->m_lastSegment = tailPtr.p->m_lastSegment; headPtr.p->m_sz += tailPtr.p->m_sz; oldTailPtr.p->m_nextSegment = tailPtr.i; } void copy(Uint32 * & insertPtr, class SectionSegmentPool & thePool, const SegmentedSectionPtr & _ptr){ Uint32 len = _ptr.sz; SectionSegment * ptrP = _ptr.p; while(len > 60){ memcpy(insertPtr, &ptrP->theData[0], 4 * 60); len -= 60; insertPtr += 60; ptrP = thePool.getPtr(ptrP->m_nextSegment); } memcpy(insertPtr, &ptrP->theData[0], 4 * len); insertPtr += len; } void copy(Uint32 * dst, SegmentedSectionPtr src){ copy(dst, g_sectionSegmentPool, src); } void getSections(Uint32 secCount, SegmentedSectionPtr ptr[3]){ Uint32 tSec0 = ptr[0].i; Uint32 tSec1 = ptr[1].i; Uint32 tSec2 = ptr[2].i; SectionSegment * p; switch(secCount){ case 3: p = g_sectionSegmentPool.getPtr(tSec2); ptr[2].p = p; ptr[2].sz = p->m_sz; case 2: p = g_sectionSegmentPool.getPtr(tSec1); ptr[1].p = p; ptr[1].sz = p->m_sz; case 1: p = g_sectionSegmentPool.getPtr(tSec0); ptr[0].p = p; ptr[0].sz = p->m_sz; case 0: return; } char msg[40]; sprintf(msg, "secCount=%d", secCount); ErrorReporter::handleAssert(msg, __FILE__, __LINE__); } void getSection(SegmentedSectionPtr & ptr, Uint32 i){ ptr.i = i; SectionSegment * p = g_sectionSegmentPool.getPtr(i); ptr.p = p; ptr.sz = p->m_sz; } #define relSz(x) ((x + SectionSegment::DataLength - 1) / SectionSegment::DataLength) void release(SegmentedSectionPtr & ptr){ g_sectionSegmentPool.releaseList(relSz(ptr.sz), ptr.i, ptr.p->m_lastSegment); } void releaseSections(Uint32 secCount, SegmentedSectionPtr ptr[3]){ Uint32 tSec0 = ptr[0].i; Uint32 tSz0 = ptr[0].sz; Uint32 tSec1 = ptr[1].i; Uint32 tSz1 = ptr[1].sz; Uint32 tSec2 = ptr[2].i; Uint32 tSz2 = ptr[2].sz; switch(secCount){ case 3: g_sectionSegmentPool.releaseList(relSz(tSz2), tSec2, ptr[2].p->m_lastSegment); case 2: g_sectionSegmentPool.releaseList(relSz(tSz1), tSec1, ptr[1].p->m_lastSegment); case 1: g_sectionSegmentPool.releaseList(relSz(tSz0), tSec0, ptr[0].p->m_lastSegment); case 0: return; } char msg[40]; sprintf(msg, "secCount=%d", secCount); ErrorReporter::handleAssert(msg, __FILE__, __LINE__); } #include void execute(void * callbackObj, SignalHeader * const header, Uint8 prio, Uint32 * const theData, LinearSectionPtr ptr[3]){ const Uint32 secCount = header->m_noOfSections; const Uint32 length = header->theLength; #ifdef TRACE_DISTRIBUTED ndbout_c("recv: %s(%d) from (%s, %d)", getSignalName(header->theVerId_signalNumber), header->theVerId_signalNumber, getBlockName(refToBlock(header->theSendersBlockRef)), refToNode(header->theSendersBlockRef)); #endif bool ok = true; Ptr secPtr[3]; switch(secCount){ case 3: ok &= import(secPtr[2], ptr[2].p, ptr[2].sz); case 2: ok &= import(secPtr[1], ptr[1].p, ptr[1].sz); case 1: ok &= import(secPtr[0], ptr[0].p, ptr[0].sz); } /** * Check that we haven't received a too long signal */ ok &= (length + secCount <= 25); Uint32 secPtrI[3]; if(ok){ /** * Normal path */ secPtrI[0] = secPtr[0].i; secPtrI[1] = secPtr[1].i; secPtrI[2] = secPtr[2].i; globalScheduler.execute(header, prio, theData, secPtrI); return; } /** * Out of memory */ for(Uint32 i = 0; im_lastSegment); } } Uint32 gsn = header->theVerId_signalNumber; Uint32 len = header->theLength; Uint32 newLen= (len > 22 ? 22 : len); SignalDroppedRep * rep = (SignalDroppedRep*)theData; memmove(rep->originalData, theData, (4 * newLen)); rep->originalGsn = gsn; rep->originalLength = len; rep->originalSectionCount = secCount; header->theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP; header->theLength = newLen + 3; header->m_noOfSections = 0; globalScheduler.execute(header, prio, theData, secPtrI); } NdbOut & operator<<(NdbOut& out, const SectionSegment & ss){ out << "[ last= " << ss.m_lastSegment << " next= " << ss.nextPool << " ]"; return out; } void print(SectionSegment * s, Uint32 len, FILE* out){ for(Uint32 i = 0; itheData[i]); if(((i + 1) % 6) == 0) fprintf(out, "\n"); } } void print(SegmentedSectionPtr ptr, FILE* out){ ptr.p = g_sectionSegmentPool.getPtr(ptr.i); Uint32 len = ptr.p->m_sz; fprintf(out, "ptr.i = %d(%p) ptr.sz = %d(%d)\n", ptr.i, ptr.p, len, ptr.sz); while(len > SectionSegment::DataLength){ print(ptr.p, SectionSegment::DataLength, out); len -= SectionSegment::DataLength; fprintf(out, "ptr.i = %d\n", ptr.p->m_nextSegment); ptr.p = g_sectionSegmentPool.getPtr(ptr.p->m_nextSegment); } print(ptr.p, len, out); fprintf(out, "\n"); } int checkJobBuffer() { /** * Check to see if jobbbuffers are starting to get full * and if so call doJob */ return globalScheduler.checkDoJob(); } void reportError(void * callbackObj, NodeId nodeId, TransporterError errorCode, const char *info) { #ifdef DEBUG_TRANSPORTER ndbout_c("reportError (%d, 0x%x) %s", nodeId, errorCode, info ? info : "") #endif DBUG_ENTER("reportError"); DBUG_PRINT("info",("nodeId %d errorCode: 0x%x info: %s", nodeId, errorCode, info)); switch (errorCode) { case TE_SIGNAL_LOST_SEND_BUFFER_FULL: { char msg[64]; snprintf(msg, sizeof(msg), "Remote note id %d.%s%s", nodeId, info ? " " : "", info ? info : ""); ErrorReporter::handleError(NDBD_EXIT_SIGNAL_LOST_SEND_BUFFER_FULL, msg, __FILE__, NST_ErrorHandler); } case TE_SIGNAL_LOST: { char msg[64]; snprintf(msg, sizeof(msg), "Remote node id %d,%s%s", nodeId, info ? " " : "", info ? info : ""); ErrorReporter::handleError(NDBD_EXIT_SIGNAL_LOST, msg, __FILE__, NST_ErrorHandler); } case TE_SHM_IPC_PERMANENT: { char msg[128]; snprintf(msg, sizeof(msg), "Remote node id %d.%s%s", nodeId, info ? " " : "", info ? info : ""); ErrorReporter::handleError(NDBD_EXIT_CONNECTION_SETUP_FAILED, msg, __FILE__, NST_ErrorHandler); } default: break; } if(errorCode & TE_DO_DISCONNECT){ reportDisconnect(callbackObj, nodeId, errorCode); } SignalT<3> signalT; Signal &signal= *(Signal*)&signalT; memset(&signal.header, 0, sizeof(signal.header)); if(errorCode & TE_DO_DISCONNECT) signal.theData[0] = NDB_LE_TransporterError; else signal.theData[0] = NDB_LE_TransporterWarning; signal.theData[1] = nodeId; signal.theData[2] = errorCode; signal.header.theLength = 3; signal.header.theSendersSignalId = 0; signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId); globalScheduler.execute(&signal, JBA, CMVMI, GSN_EVENT_REP); DBUG_VOID_RETURN; } /** * Report average send length in bytes (4096 last sends) */ void reportSendLen(void * callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){ SignalT<3> signalT; Signal &signal= *(Signal*)&signalT; memset(&signal.header, 0, sizeof(signal.header)); signal.header.theLength = 3; signal.header.theSendersSignalId = 0; signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId); signal.theData[0] = NDB_LE_SendBytesStatistic; signal.theData[1] = nodeId; signal.theData[2] = (bytes/count); globalScheduler.execute(&signal, JBA, CMVMI, GSN_EVENT_REP); } /** * Report average receive length in bytes (4096 last receives) */ void reportReceiveLen(void * callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){ SignalT<3> signalT; Signal &signal= *(Signal*)&signalT; memset(&signal.header, 0, sizeof(signal.header)); signal.header.theLength = 3; signal.header.theSendersSignalId = 0; signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId); signal.theData[0] = NDB_LE_ReceiveBytesStatistic; signal.theData[1] = nodeId; signal.theData[2] = (bytes/count); globalScheduler.execute(&signal, JBA, CMVMI, GSN_EVENT_REP); } /** * Report connection established */ void reportConnect(void * callbackObj, NodeId nodeId){ SignalT<1> signalT; Signal &signal= *(Signal*)&signalT; memset(&signal.header, 0, sizeof(signal.header)); signal.header.theLength = 1; signal.header.theSendersSignalId = 0; signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId); signal.theData[0] = nodeId; globalScheduler.execute(&signal, JBA, CMVMI, GSN_CONNECT_REP); } /** * Report connection broken */ void reportDisconnect(void * callbackObj, NodeId nodeId, Uint32 errNo){ DBUG_ENTER("reportDisconnect"); SignalT signalT; Signal &signal= *(Signal*)&signalT; memset(&signal.header, 0, sizeof(signal.header)); signal.header.theLength = DisconnectRep::SignalLength; signal.header.theSendersSignalId = 0; signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId); signal.header.theTrace = TestOrd::TraceDisconnect; DisconnectRep * const rep = (DisconnectRep *)&signal.theData[0]; rep->nodeId = nodeId; rep->err = errNo; globalScheduler.execute(&signal, JBA, CMVMI, GSN_DISCONNECT_REP); DBUG_VOID_RETURN; } void SignalLoggerManager::printSegmentedSection(FILE * output, const SignalHeader & sh, const SegmentedSectionPtr ptr[3], unsigned i) { fprintf(output, "SECTION %u type=segmented", i); if (i >= 3) { fprintf(output, " *** invalid ***\n"); return; } const Uint32 len = ptr[i].sz; SectionSegment * ssp = ptr[i].p; Uint32 pos = 0; fprintf(output, " size=%u\n", (unsigned)len); while (pos < len) { if (pos > 0 && pos % SectionSegment::DataLength == 0) { ssp = g_sectionSegmentPool.getPtr(ssp->m_nextSegment); } printDataWord(output, pos, ssp->theData[pos % SectionSegment::DataLength]); } if (len > 0) putc('\n', output); } void transporter_recv_from(void * callbackObj, NodeId nodeId){ globalData.m_nodeInfo[nodeId].m_heartbeat_cnt= 0; return; }