TS-1067 Reduce to one UDP pipe, and remove BulkIOSend()
Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/e669312f Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/e669312f Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/e669312f Branch: refs/heads/3.3.x Commit: e669312fe62a46164457e8b515651795e6cbd607 Parents: 33d92c8 Author: Leif Hedstrom <zw...@apache.org> Authored: Tue Mar 26 09:48:36 2013 -0600 Committer: Leif Hedstrom <zw...@apache.org> Committed: Tue Apr 2 13:52:33 2013 -0600 ---------------------------------------------------------------------- iocore/net/P_LibBulkIO.h | 2 - iocore/net/P_UDPNet.h | 6 -- iocore/net/UnixUDPNet.cc | 154 +++++++++++------------------------------ proxy/Main.cc | 2 - 4 files changed, 40 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e669312f/iocore/net/P_LibBulkIO.h ---------------------------------------------------------------------- diff --git a/iocore/net/P_LibBulkIO.h b/iocore/net/P_LibBulkIO.h index 0d80671..0fe541f 100644 --- a/iocore/net/P_LibBulkIO.h +++ b/iocore/net/P_LibBulkIO.h @@ -166,8 +166,6 @@ int BulkIOSplitPkt(struct InkBulkIOState *bioCookie, int BulkIOAppendToReqBlock(struct InkBulkIOState *bioCookie, struct InkBulkIOAggregator *bioAggregator, Ptr<IOBufferBlock> pkt); -int BulkIOSend(struct InkBulkIOState *bioCookie, uint32_t blkId); - void BulkIORequestComplete(struct InkBulkIOState *bioCookie, struct InkBulkIOAggregator *bioAggregator); void BulkIOFlush(struct InkBulkIOState *bioCookie, struct InkBulkIOAggregator *bioAggregator); http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e669312f/iocore/net/P_UDPNet.h ---------------------------------------------------------------------- diff --git a/iocore/net/P_UDPNet.h b/iocore/net/P_UDPNet.h index 31a2492..239ab34 100644 --- a/iocore/net/P_UDPNet.h +++ b/iocore/net/P_UDPNet.h @@ -63,9 +63,6 @@ class UDPQueue public: void service(UDPNetHandler *); - // these are internal APIs - // BulkIOSend uses the BulkIO kernel module for bulk data transfer - void BulkIOSend(); // In the absence of bulk-io, we are down sending packet after packet void SendPackets(); void SendUDPPacket(UDPPacketInternal * p, int32_t pktLen); @@ -378,9 +375,6 @@ struct InkSinglePipeInfo struct InkPipeInfo { - int numPipes; - double interfaceMbps; - double reliabilityMbps; InkSinglePipeInfo *perPipeInfo; }; http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e669312f/iocore/net/UnixUDPNet.cc ---------------------------------------------------------------------- diff --git a/iocore/net/UnixUDPNet.cc b/iocore/net/UnixUDPNet.cc index ccf1fde..029d8cc 100644 --- a/iocore/net/UnixUDPNet.cc +++ b/iocore/net/UnixUDPNet.cc @@ -54,7 +54,6 @@ int32_t g_udp_periodicFreeCancelledPkts; int32_t g_udp_numSendRetries; #include "P_LibBulkIO.h" -void *G_bulkIOState = NULL; // // Public functions @@ -90,7 +89,6 @@ initialize_thread_for_udp_net(EThread * thread) thread->schedule_every(get_UDPPollCont(thread), -9); thread->schedule_imm(get_UDPNetHandler(thread)); - Debug("bulk-io", "%s bulk-io for sends", G_bulkIOState ? "Using" : "Not using"); } int @@ -658,8 +656,8 @@ UDPQueue::service(UDPNetHandler * nh) double minPktSpacing; uint32_t pktSize; int64_t pktLen; - int i; bool addToGuaranteedQ; + (void) nh; static ink_hrtime lastPrintTime = ink_get_hrtime_internal(); static ink_hrtime lastSchedTime = ink_get_hrtime_internal(); @@ -741,46 +739,25 @@ UDPQueue::service(UDPNetHandler * nh) lastPrintTime = now; } - for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) - G_inkPipeInfo.perPipeInfo[i].queue->advanceNow(now); - - if (G_bulkIOState) { - BulkIOSend(); - } else { - SendPackets(); - } + G_inkPipeInfo.perPipeInfo[0].queue->advanceNow(now); + SendPackets(); timeSpent = ink_hrtime_to_msec(now - last_report); if (timeSpent > 10000) { - // if (bytesSent > 0) - // timespent is in milli-seconds - char temp[2048], *p1; double bw, totalBw; - temp[0] = '\0'; - p1 = temp; - if (bytesSent > 0) totalBw = (bytesSent * 8.0 * 1000.0) / (timeSpent * 1024.0 * 1024.0); else totalBw = 1.0; - for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) { - // bw is in Mbps - bw = (G_inkPipeInfo.perPipeInfo[i].bytesSent * 8.0 * 1000.0) / (timeSpent * 1024.0 * 1024.0); - snprintf(p1, sizeof(temp), "\t class[%d] = %f Mbps, alloc = %f Mbps, (conf'ed = %f, got = %f) \n", - i, bw, (G_inkPipeInfo.perPipeInfo[i].bwAlloc / (1024.0 * 1024.0)), - G_inkPipeInfo.perPipeInfo[i].wt, bw / totalBw); - p1 += strlen(p1); - - // use a weighted estimator of current usage - G_inkPipeInfo.perPipeInfo[i].bwUsed = (4.0 * G_inkPipeInfo.perPipeInfo[i].bwUsed / 5.0) + (bw / 5.0); - G_inkPipeInfo.perPipeInfo[i].bytesSent = 0; - G_inkPipeInfo.perPipeInfo[i].pktsSent = 0; - } - if (temp[0]) - Debug("udpnet-bw", "B/w: %f Mbps; breakdown: \n%s", totalBw, temp); + // bw is in Mbps + bw = (G_inkPipeInfo.perPipeInfo[0].bytesSent * 8.0 * 1000.0) / (timeSpent * 1024.0 * 1024.0); + // use a weighted estimator of current usage + G_inkPipeInfo.perPipeInfo[0].bwUsed = (4.0 * G_inkPipeInfo.perPipeInfo[0].bwUsed / 5.0) + (bw / 5.0); + G_inkPipeInfo.perPipeInfo[0].bytesSent = 0; + G_inkPipeInfo.perPipeInfo[0].pktsSent = 0; bytesSent = 0; last_report = now; @@ -799,23 +776,14 @@ UDPQueue::SendPackets() // ink_hrtime send_threshold_time = now + HRTIME_MSECONDS(5); // send packets for SLOT_TIME per attempt ink_hrtime send_threshold_time = now + SLOT_TIME; - int32_t bytesThisSlot = INT_MAX, bytesUsed = 0, reliabilityBytes = 0; - int32_t bytesThisPipe, sentOne, i; + int32_t bytesThisSlot = INT_MAX, bytesUsed = 0; + int32_t bytesThisPipe, sentOne; int64_t pktLen; ink_hrtime timeDelta = 0; if (now > last_service) timeDelta = ink_hrtime_to_msec(now - last_service); - if (G_inkPipeInfo.numPipes > 0) { - bytesThisSlot = (int32_t) (((G_inkPipeInfo.reliabilityMbps * 1024.0 * 1024.0) / (8.0 * 1000.0)) * timeDelta); - if (bytesThisSlot == 0) { - // use at most 10% for reliability - bytesThisSlot = (int32_t) (((G_inkPipeInfo.interfaceMbps * 1024.0 * 1024.0) / (8.0 * 1000.0)) * timeDelta * 0.1); - reliabilityBytes = bytesThisSlot; - } - } - while ((p = reliabilityPktQueue.dequeue()) != NULL) { pktLen = p->getPktLength(); g_udp_bytesPending -= pktLen; @@ -836,41 +804,35 @@ UDPQueue::SendPackets() p->free(); } - - if (G_inkPipeInfo.numPipes > 0) - bytesThisSlot = (int32_t) (((G_inkPipeInfo.interfaceMbps * 1024.0 * 1024.0) / - (8.0 * 1000.0)) * timeDelta - reliabilityBytes); - else - bytesThisSlot = INT_MAX; + bytesThisSlot = INT_MAX; sendPackets: sentOne = false; send_threshold_time = now + SLOT_TIME; - for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) { - bytesThisPipe = (int32_t) (bytesThisSlot * G_inkPipeInfo.perPipeInfo[i].wt); - while ((bytesThisPipe > 0) && (G_inkPipeInfo.perPipeInfo[i].queue->firstPacket(send_threshold_time))) { - p = G_inkPipeInfo.perPipeInfo[i].queue->getFirstPacket(); - pktLen = p->getPktLength(); - g_udp_bytesPending -= pktLen; - - p->conn->nBytesTodo -= pktLen; - p->conn->nBytesDone += pktLen; - if (p->conn->shouldDestroy()) - goto next_pkt; - if (p->conn->GetSendGenerationNumber() != p->reqGenerationNum) - goto next_pkt; - - G_inkPipeInfo.perPipeInfo[i].bytesSent += pktLen; - SendUDPPacket(p, pktLen); - bytesUsed += pktLen; - bytesThisPipe -= pktLen; - next_pkt: - sentOne = true; - p->free(); - - if (bytesThisPipe < 0) - break; - } + bytesThisPipe = (int32_t) (bytesThisSlot * G_inkPipeInfo.perPipeInfo[0].wt); + + while ((bytesThisPipe > 0) && (G_inkPipeInfo.perPipeInfo[0].queue->firstPacket(send_threshold_time))) { + p = G_inkPipeInfo.perPipeInfo[0].queue->getFirstPacket(); + pktLen = p->getPktLength(); + g_udp_bytesPending -= pktLen; + + p->conn->nBytesTodo -= pktLen; + p->conn->nBytesDone += pktLen; + if (p->conn->shouldDestroy()) + goto next_pkt; + if (p->conn->GetSendGenerationNumber() != p->reqGenerationNum) + goto next_pkt; + + G_inkPipeInfo.perPipeInfo[0].bytesSent += pktLen; + SendUDPPacket(p, pktLen); + bytesUsed += pktLen; + bytesThisPipe -= pktLen; + next_pkt: + sentOne = true; + p->free(); + + if (bytesThisPipe < 0) + break; } bytesThisSlot -= bytesUsed; @@ -878,10 +840,8 @@ sendPackets: if ((bytesThisSlot > 0) && (sentOne)) { // redistribute the slack... now = ink_get_hrtime_internal(); - for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) { - if (G_inkPipeInfo.perPipeInfo[i].queue->firstPacket(now) == NULL) { - G_inkPipeInfo.perPipeInfo[i].queue->advanceNow(now); - } + if (G_inkPipeInfo.perPipeInfo[0].queue->firstPacket(now) == NULL) { + G_inkPipeInfo.perPipeInfo[0].queue->advanceNow(now); } goto sendPackets; } @@ -890,9 +850,8 @@ sendPackets: (now - lastCleanupTime > ink_hrtime_from_sec(g_udp_periodicFreeCancelledPkts))) { uint64_t nbytes = g_udp_bytesPending; ink_hrtime startTime = ink_get_hrtime_internal(), endTime; - for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) { - G_inkPipeInfo.perPipeInfo[i].queue->FreeCancelledPackets(g_udp_periodicCleanupSlots); - } + + G_inkPipeInfo.perPipeInfo[0].queue->FreeCancelledPackets(g_udp_periodicCleanupSlots); endTime = ink_get_hrtime_internal(); Debug("udp-pending-packets", "Did cleanup of %d buckets: %" PRId64 " bytes in %" PRId64 " m.sec", g_udp_periodicCleanupSlots, nbytes - g_udp_bytesPending, (int64_t)ink_hrtime_to_msec(endTime - startTime)); @@ -950,37 +909,6 @@ UDPQueue::SendUDPPacket(UDPPacketInternal * p, int32_t pktLen) } } -#ifndef BULK_IO_SEND_IS_BROKEN -void -UDPQueue::BulkIOSend() -{ - ink_assert(!"Don't call here..."); -} -#else -void -UDPQueue::BulkIOSend() -{ - bool sentOne = false; - UDPPacketInternal *p; - ink_hrtime now = ink_get_hrtime_internal(); - ink_hrtime send_threshold_time = now + SLOT_TIME; - - for (int i = 0; i < G_inkPipeInfo.numPipes + 1; i++) { - while (p = G_inkPipeInfo.perPipeInfo[i].queue->firstPacket(send_threshold_time)) { - p = G_inkPipeInfo.perPipeInfo[i].queue->getFirstPacket(); - sentOne = true; - Debug("bulk-io-pkt", "Adding a packet..."); - BulkIOAddPkt(G_bulkIOState, &G_bulkIOAggregator, p, p->conn->getPortNum()); - bytesSent += p->getPktLength(); - // Now the packet is "sent"; get rid of it - p->free(); - } - } - if (sentOne) { - BulkIOFlush(G_bulkIOState, &G_bulkIOAggregator); - } -} -#endif void UDPQueue::send(UDPPacket * p) @@ -1054,8 +982,6 @@ UDPNetHandler::mainNetEvent(int event, Event * e) ink_assert(uc->refcount >= 1); next = uc->polling_link.next; if (uc->shouldDestroy()) { - if (G_inkPipeInfo.numPipes > 0) - G_inkPipeInfo.perPipeInfo[uc->pipe_class].count--; //changed by YTS Team, yamsat //udp_polling->remove(uc,uc->polling_link); uc->Release(); http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e669312f/proxy/Main.cc ---------------------------------------------------------------------- diff --git a/proxy/Main.cc b/proxy/Main.cc index 5337748..a22098e 100644 --- a/proxy/Main.cc +++ b/proxy/Main.cc @@ -1525,8 +1525,6 @@ main(int argc, char **argv) // but I don't know for sure if we still need this. G_inkPipeInfo.perPipeInfo = NEW(new InkSinglePipeInfo[1]); G_inkPipeInfo.perPipeInfo[0].wt = 1.0; - G_inkPipeInfo.numPipes = 0; - G_inkPipeInfo.interfaceMbps = 0.0; init_http_header();