TS-1067 Reduce to one UDP pipe, and remove BulkIOSend()

Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/e669312f
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/e669312f
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/e669312f

Branch: refs/heads/3.3.x
Commit: e669312fe62a46164457e8b515651795e6cbd607
Parents: 33d92c8
Author: Leif Hedstrom <zw...@apache.org>
Authored: Tue Mar 26 09:48:36 2013 -0600
Committer: Leif Hedstrom <zw...@apache.org>
Committed: Tue Apr 2 13:52:33 2013 -0600

----------------------------------------------------------------------
 iocore/net/P_LibBulkIO.h |    2 -
 iocore/net/P_UDPNet.h    |    6 --
 iocore/net/UnixUDPNet.cc |  154 +++++++++++------------------------------
 proxy/Main.cc            |    2 -
 4 files changed, 40 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e669312f/iocore/net/P_LibBulkIO.h
----------------------------------------------------------------------
diff --git a/iocore/net/P_LibBulkIO.h b/iocore/net/P_LibBulkIO.h
index 0d80671..0fe541f 100644
--- a/iocore/net/P_LibBulkIO.h
+++ b/iocore/net/P_LibBulkIO.h
@@ -166,8 +166,6 @@ int BulkIOSplitPkt(struct InkBulkIOState *bioCookie,
 int BulkIOAppendToReqBlock(struct InkBulkIOState *bioCookie,
                            struct InkBulkIOAggregator *bioAggregator, 
Ptr<IOBufferBlock> pkt);
 
-int BulkIOSend(struct InkBulkIOState *bioCookie, uint32_t blkId);
-
 void BulkIORequestComplete(struct InkBulkIOState *bioCookie, struct 
InkBulkIOAggregator *bioAggregator);
 
 void BulkIOFlush(struct InkBulkIOState *bioCookie, struct InkBulkIOAggregator 
*bioAggregator);

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e669312f/iocore/net/P_UDPNet.h
----------------------------------------------------------------------
diff --git a/iocore/net/P_UDPNet.h b/iocore/net/P_UDPNet.h
index 31a2492..239ab34 100644
--- a/iocore/net/P_UDPNet.h
+++ b/iocore/net/P_UDPNet.h
@@ -63,9 +63,6 @@ class UDPQueue
 public:
 
   void service(UDPNetHandler *);
-  // these are internal APIs
-  // BulkIOSend uses the BulkIO kernel module for bulk data transfer
-  void BulkIOSend();
   // In the absence of bulk-io, we are down sending packet after packet
   void SendPackets();
   void SendUDPPacket(UDPPacketInternal * p, int32_t pktLen);
@@ -378,9 +375,6 @@ struct InkSinglePipeInfo
 
 struct InkPipeInfo
 {
-  int numPipes;
-  double interfaceMbps;
-  double reliabilityMbps;
   InkSinglePipeInfo *perPipeInfo;
 };
 

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e669312f/iocore/net/UnixUDPNet.cc
----------------------------------------------------------------------
diff --git a/iocore/net/UnixUDPNet.cc b/iocore/net/UnixUDPNet.cc
index ccf1fde..029d8cc 100644
--- a/iocore/net/UnixUDPNet.cc
+++ b/iocore/net/UnixUDPNet.cc
@@ -54,7 +54,6 @@ int32_t g_udp_periodicFreeCancelledPkts;
 int32_t g_udp_numSendRetries;
 
 #include "P_LibBulkIO.h"
-void *G_bulkIOState = NULL;
 
 //
 // Public functions
@@ -90,7 +89,6 @@ initialize_thread_for_udp_net(EThread * thread)
 
   thread->schedule_every(get_UDPPollCont(thread), -9);
   thread->schedule_imm(get_UDPNetHandler(thread));
-  Debug("bulk-io", "%s bulk-io for sends", G_bulkIOState ? "Using" : "Not 
using");
 }
 
 int
@@ -658,8 +656,8 @@ UDPQueue::service(UDPNetHandler * nh)
   double minPktSpacing;
   uint32_t pktSize;
   int64_t pktLen;
-  int i;
   bool addToGuaranteedQ;
+
   (void) nh;
   static ink_hrtime lastPrintTime = ink_get_hrtime_internal();
   static ink_hrtime lastSchedTime = ink_get_hrtime_internal();
@@ -741,46 +739,25 @@ UDPQueue::service(UDPNetHandler * nh)
     lastPrintTime = now;
   }
 
-  for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++)
-    G_inkPipeInfo.perPipeInfo[i].queue->advanceNow(now);
-
-  if (G_bulkIOState) {
-    BulkIOSend();
-  } else {
-    SendPackets();
-  }
+  G_inkPipeInfo.perPipeInfo[0].queue->advanceNow(now);
+  SendPackets();
 
   timeSpent = ink_hrtime_to_msec(now - last_report);
   if (timeSpent > 10000) {
-    // if (bytesSent > 0)
-    // timespent is in milli-seconds
-    char temp[2048], *p1;
     double bw, totalBw;
 
-    temp[0] = '\0';
-    p1 = temp;
-
     if (bytesSent > 0)
       totalBw = (bytesSent * 8.0 * 1000.0) / (timeSpent * 1024.0 * 1024.0);
     else
       totalBw = 1.0;
 
-    for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
-      // bw is in Mbps
-      bw = (G_inkPipeInfo.perPipeInfo[i].bytesSent * 8.0 * 1000.0) / 
(timeSpent * 1024.0 * 1024.0);
-      snprintf(p1, sizeof(temp), "\t class[%d] = %f Mbps, alloc = %f Mbps, 
(conf'ed = %f, got = %f) \n",
-               i, bw, (G_inkPipeInfo.perPipeInfo[i].bwAlloc / (1024.0 * 
1024.0)),
-               G_inkPipeInfo.perPipeInfo[i].wt, bw / totalBw);
-      p1 += strlen(p1);
-
-      // use a weighted estimator of current usage
-      G_inkPipeInfo.perPipeInfo[i].bwUsed = (4.0 * 
G_inkPipeInfo.perPipeInfo[i].bwUsed / 5.0) + (bw / 5.0);
-      G_inkPipeInfo.perPipeInfo[i].bytesSent = 0;
-      G_inkPipeInfo.perPipeInfo[i].pktsSent = 0;
-    }
-    if (temp[0])
-      Debug("udpnet-bw", "B/w: %f Mbps; breakdown: \n%s", totalBw, temp);
+    // bw is in Mbps
+    bw = (G_inkPipeInfo.perPipeInfo[0].bytesSent * 8.0 * 1000.0) / (timeSpent 
* 1024.0 * 1024.0);
 
+    // use a weighted estimator of current usage
+    G_inkPipeInfo.perPipeInfo[0].bwUsed = (4.0 * 
G_inkPipeInfo.perPipeInfo[0].bwUsed / 5.0) + (bw / 5.0);
+    G_inkPipeInfo.perPipeInfo[0].bytesSent = 0;
+    G_inkPipeInfo.perPipeInfo[0].pktsSent = 0;
 
     bytesSent = 0;
     last_report = now;
@@ -799,23 +776,14 @@ UDPQueue::SendPackets()
   // ink_hrtime send_threshold_time = now + HRTIME_MSECONDS(5);
   // send packets for SLOT_TIME per attempt
   ink_hrtime send_threshold_time = now + SLOT_TIME;
-  int32_t bytesThisSlot = INT_MAX, bytesUsed = 0, reliabilityBytes = 0;
-  int32_t bytesThisPipe, sentOne, i;
+  int32_t bytesThisSlot = INT_MAX, bytesUsed = 0;
+  int32_t bytesThisPipe, sentOne;
   int64_t pktLen;
   ink_hrtime timeDelta = 0;
 
   if (now > last_service)
     timeDelta = ink_hrtime_to_msec(now - last_service);
 
-  if (G_inkPipeInfo.numPipes > 0) {
-    bytesThisSlot = (int32_t) (((G_inkPipeInfo.reliabilityMbps * 1024.0 * 
1024.0) / (8.0 * 1000.0)) * timeDelta);
-    if (bytesThisSlot == 0) {
-      // use at most 10% for reliability
-      bytesThisSlot = (int32_t) (((G_inkPipeInfo.interfaceMbps * 1024.0 * 
1024.0) / (8.0 * 1000.0)) * timeDelta * 0.1);
-      reliabilityBytes = bytesThisSlot;
-    }
-  }
-
   while ((p = reliabilityPktQueue.dequeue()) != NULL) {
     pktLen = p->getPktLength();
     g_udp_bytesPending -= pktLen;
@@ -836,41 +804,35 @@ UDPQueue::SendPackets()
     p->free();
   }
 
-
-  if (G_inkPipeInfo.numPipes > 0)
-    bytesThisSlot = (int32_t) (((G_inkPipeInfo.interfaceMbps * 1024.0 * 
1024.0) /
-                              (8.0 * 1000.0)) * timeDelta - reliabilityBytes);
-  else
-    bytesThisSlot = INT_MAX;
+  bytesThisSlot = INT_MAX;
 
 sendPackets:
   sentOne = false;
   send_threshold_time = now + SLOT_TIME;
-  for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
-    bytesThisPipe = (int32_t) (bytesThisSlot * 
G_inkPipeInfo.perPipeInfo[i].wt);
-    while ((bytesThisPipe > 0) && 
(G_inkPipeInfo.perPipeInfo[i].queue->firstPacket(send_threshold_time))) {
-      p = G_inkPipeInfo.perPipeInfo[i].queue->getFirstPacket();
-      pktLen = p->getPktLength();
-      g_udp_bytesPending -= pktLen;
-
-      p->conn->nBytesTodo -= pktLen;
-      p->conn->nBytesDone += pktLen;
-      if (p->conn->shouldDestroy())
-        goto next_pkt;
-      if (p->conn->GetSendGenerationNumber() != p->reqGenerationNum)
-        goto next_pkt;
-
-      G_inkPipeInfo.perPipeInfo[i].bytesSent += pktLen;
-      SendUDPPacket(p, pktLen);
-      bytesUsed += pktLen;
-      bytesThisPipe -= pktLen;
-    next_pkt:
-      sentOne = true;
-      p->free();
-
-      if (bytesThisPipe < 0)
-        break;
-    }
+  bytesThisPipe = (int32_t) (bytesThisSlot * G_inkPipeInfo.perPipeInfo[0].wt);
+
+  while ((bytesThisPipe > 0) && 
(G_inkPipeInfo.perPipeInfo[0].queue->firstPacket(send_threshold_time))) {
+    p = G_inkPipeInfo.perPipeInfo[0].queue->getFirstPacket();
+    pktLen = p->getPktLength();
+    g_udp_bytesPending -= pktLen;
+
+    p->conn->nBytesTodo -= pktLen;
+    p->conn->nBytesDone += pktLen;
+    if (p->conn->shouldDestroy())
+      goto next_pkt;
+    if (p->conn->GetSendGenerationNumber() != p->reqGenerationNum)
+      goto next_pkt;
+
+    G_inkPipeInfo.perPipeInfo[0].bytesSent += pktLen;
+    SendUDPPacket(p, pktLen);
+    bytesUsed += pktLen;
+    bytesThisPipe -= pktLen;
+  next_pkt:
+    sentOne = true;
+    p->free();
+
+    if (bytesThisPipe < 0)
+      break;
   }
 
   bytesThisSlot -= bytesUsed;
@@ -878,10 +840,8 @@ sendPackets:
   if ((bytesThisSlot > 0) && (sentOne)) {
     // redistribute the slack...
     now = ink_get_hrtime_internal();
-    for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
-      if (G_inkPipeInfo.perPipeInfo[i].queue->firstPacket(now) == NULL) {
-        G_inkPipeInfo.perPipeInfo[i].queue->advanceNow(now);
-      }
+    if (G_inkPipeInfo.perPipeInfo[0].queue->firstPacket(now) == NULL) {
+      G_inkPipeInfo.perPipeInfo[0].queue->advanceNow(now);
     }
     goto sendPackets;
   }
@@ -890,9 +850,8 @@ sendPackets:
       (now - lastCleanupTime > 
ink_hrtime_from_sec(g_udp_periodicFreeCancelledPkts))) {
     uint64_t nbytes = g_udp_bytesPending;
     ink_hrtime startTime = ink_get_hrtime_internal(), endTime;
-    for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
-      
G_inkPipeInfo.perPipeInfo[i].queue->FreeCancelledPackets(g_udp_periodicCleanupSlots);
-    }
+
+    
G_inkPipeInfo.perPipeInfo[0].queue->FreeCancelledPackets(g_udp_periodicCleanupSlots);
     endTime = ink_get_hrtime_internal();
     Debug("udp-pending-packets", "Did cleanup of %d buckets: %" PRId64 " bytes 
in %" PRId64 " m.sec",
           g_udp_periodicCleanupSlots, nbytes - g_udp_bytesPending, 
(int64_t)ink_hrtime_to_msec(endTime - startTime));
@@ -950,37 +909,6 @@ UDPQueue::SendUDPPacket(UDPPacketInternal * p, int32_t 
pktLen)
   }
 }
 
-#ifndef BULK_IO_SEND_IS_BROKEN
-void
-UDPQueue::BulkIOSend()
-{
-  ink_assert(!"Don't call here...");
-}
-#else
-void
-UDPQueue::BulkIOSend()
-{
-  bool sentOne = false;
-  UDPPacketInternal *p;
-  ink_hrtime now = ink_get_hrtime_internal();
-  ink_hrtime send_threshold_time = now + SLOT_TIME;
-
-  for (int i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
-    while (p = 
G_inkPipeInfo.perPipeInfo[i].queue->firstPacket(send_threshold_time)) {
-      p = G_inkPipeInfo.perPipeInfo[i].queue->getFirstPacket();
-      sentOne = true;
-      Debug("bulk-io-pkt", "Adding a packet...");
-      BulkIOAddPkt(G_bulkIOState, &G_bulkIOAggregator, p, 
p->conn->getPortNum());
-      bytesSent += p->getPktLength();
-      // Now the packet is "sent"; get rid of it
-      p->free();
-    }
-  }
-  if (sentOne) {
-    BulkIOFlush(G_bulkIOState, &G_bulkIOAggregator);
-  }
-}
-#endif
 
 void
 UDPQueue::send(UDPPacket * p)
@@ -1054,8 +982,6 @@ UDPNetHandler::mainNetEvent(int event, Event * e)
       ink_assert(uc->refcount >= 1);
       next = uc->polling_link.next;
       if (uc->shouldDestroy()) {
-        if (G_inkPipeInfo.numPipes > 0)
-          G_inkPipeInfo.perPipeInfo[uc->pipe_class].count--;
         //changed by YTS Team, yamsat
         //udp_polling->remove(uc,uc->polling_link);
         uc->Release();

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e669312f/proxy/Main.cc
----------------------------------------------------------------------
diff --git a/proxy/Main.cc b/proxy/Main.cc
index 5337748..a22098e 100644
--- a/proxy/Main.cc
+++ b/proxy/Main.cc
@@ -1525,8 +1525,6 @@ main(int argc, char **argv)
   // but I don't know for sure if we still need this.
   G_inkPipeInfo.perPipeInfo = NEW(new InkSinglePipeInfo[1]);
   G_inkPipeInfo.perPipeInfo[0].wt = 1.0;
-  G_inkPipeInfo.numPipes = 0;
-  G_inkPipeInfo.interfaceMbps = 0.0;
 
   init_http_header();
 

Reply via email to