Author: cmihail
Date: Mon Jul 25 18:45:59 2011
New Revision: 52864

URL: http://svn.reactos.org/svn/reactos?rev=52864&view=rev
Log:
[lwIP/TCPIP]
- Add queue for connections objects in order to buffer packets that have 
arrived but have no corresponding receive requests to be assigned to. We buffer 
these packets to avoid giving a timeout that could cause throughput slowdowns.

Modified:
    branches/GSoC_2011/TcpIpDriver/drivers/network/tcpip/include/titypes.h
    branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/event.c
    branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/tcp.c
    branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/include/rosip.h
    branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/rostcp.c

Modified: branches/GSoC_2011/TcpIpDriver/drivers/network/tcpip/include/titypes.h
URL: 
http://svn.reactos.org/svn/reactos/branches/GSoC_2011/TcpIpDriver/drivers/network/tcpip/include/titypes.h?rev=52864&r1=52863&r2=52864&view=diff
==============================================================================
--- branches/GSoC_2011/TcpIpDriver/drivers/network/tcpip/include/titypes.h 
[iso-8859-1] (original)
+++ branches/GSoC_2011/TcpIpDriver/drivers/network/tcpip/include/titypes.h 
[iso-8859-1] Mon Jul 25 18:45:59 2011
@@ -268,6 +268,8 @@
     LIST_ENTRY SendRequest;    /* Queued send requests */
     LIST_ENTRY ShutdownRequest;/* Queued shutdown requests */
 
+    LIST_ENTRY PacketQueue;    /* Queued received packets waiting to be 
processed */
+
     /* Signals */
     UINT    SignalState;       /* Active signals from oskit */
     

Modified: branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/event.c
URL: 
http://svn.reactos.org/svn/reactos/branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/event.c?rev=52864&r1=52863&r2=52864&view=diff
==============================================================================
--- branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/event.c 
[iso-8859-1] (original)
+++ branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/event.c 
[iso-8859-1] Mon Jul 25 18:45:59 2011
@@ -144,18 +144,32 @@
 TCPFinEventHandler(void *arg, err_t err)
 {
     PCONNECTION_ENDPOINT Connection = (PCONNECTION_ENDPOINT)arg;
+    const NTSTATUS status = TCPTranslateError(err);
 
     DbgPrint("[IP, TCPFinEventHandler] Called for Connection( 0x%x )-> 
SocketContext = pcb (0x%x)\n", Connection, Connection->SocketContext);
 
     /* Only clear the pointer if the shutdown was caused by an error */
-    if (err != ERR_OK)
+    if ((err != ERR_OK))// && (status != STATUS_REMOTE_DISCONNECT))
     {
         /* We're already closed by the error so we don't want to call 
lwip_close */
         DbgPrint("[IP, TCPFinEventHandler] MAKING Connection( 0x%x )-> 
SocketContext = pcb (0x%x) NULL\n", Connection, Connection->SocketContext);
+
+        // close all possible callbacks
+        /*tcp_arg((PTCP_PCB)Connection->SocketContext, NULL);
+
+        if (((PTCP_PCB)Connection->SocketContext)->state != LISTEN)
+        {
+            tcp_recv((PTCP_PCB)Connection->SocketContext, NULL);
+            tcp_sent((PTCP_PCB)Connection->SocketContext, NULL);
+            tcp_err((PTCP_PCB)Connection->SocketContext, NULL);
+        }
+
+        tcp_accept((PTCP_PCB)Connection->SocketContext, NULL);*/
+
         Connection->SocketContext = NULL;
     }
-    
-    FlushAllQueues(Connection, TCPTranslateError(err));
+
+    FlushAllQueues(Connection, status);
 
     DbgPrint("[IP, TCPFinEventHandler] Done\n");
 }
@@ -305,7 +319,7 @@
 u32_t
 TCPRecvEventHandler(void *arg, struct pbuf *p)
 {
-    PCONNECTION_ENDPOINT Connection = arg;
+    PCONNECTION_ENDPOINT Connection = (PCONNECTION_ENDPOINT)arg;
     PTDI_BUCKET Bucket;
     PLIST_ENTRY Entry;
     PIRP Irp;

Modified: branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/tcp.c
URL: 
http://svn.reactos.org/svn/reactos/branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/tcp.c?rev=52864&r1=52863&r2=52864&view=diff
==============================================================================
--- branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/tcp.c 
[iso-8859-1] (original)
+++ branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/tcp.c 
[iso-8859-1] Mon Jul 25 18:45:59 2011
@@ -25,7 +25,7 @@
 
 VOID ConnectionFree(PVOID Object)
 {
-    PCONNECTION_ENDPOINT Connection = Object;
+    PCONNECTION_ENDPOINT Connection = (PCONNECTION_ENDPOINT)Object;
     KIRQL OldIrql;
 
     TI_DbgPrint(DEBUG_TCP, ("Freeing TCP Endpoint\n"));
@@ -41,7 +41,7 @@
 
 PCONNECTION_ENDPOINT TCPAllocateConnectionEndpoint( PVOID ClientContext )
 {
-    PCONNECTION_ENDPOINT Connection =
+    PCONNECTION_ENDPOINT Connection = (PCONNECTION_ENDPOINT)
         ExAllocatePoolWithTag(NonPagedPool, sizeof(CONNECTION_ENDPOINT),
                               CONN_ENDPT_TAG);
     if (!Connection)
@@ -58,6 +58,7 @@
     InitializeListHead(&Connection->ReceiveRequest);
     InitializeListHead(&Connection->SendRequest);
     InitializeListHead(&Connection->ShutdownRequest);
+    InitializeListHead(&Connection->PacketQueue);
 
     /* Save client context pointer */
     Connection->ClientContext = ClientContext;
@@ -407,6 +408,9 @@
 {
     PTDI_BUCKET Bucket;
     KIRQL OldIrql;
+    PUCHAR DataBuffer;
+    UINT DataLen, Received;
+    NTSTATUS Status;
 
     TI_DbgPrint(DEBUG_TCP,("[IP, TCPReceiveData] Called for %d bytes (on 
socket %x)\n",
                            ReceiveLength, Connection->SocketContext));
@@ -414,29 +418,38 @@
     DbgPrint("[IP, TCPReceiveData] Called for %d bytes (on 
Connection->SocketContext = 0x%x)\n",
                            ReceiveLength, Connection->SocketContext);
 
-    LockObject(Connection, &OldIrql);
-    
-    /* Freed in TCPSocketState */
-    Bucket = ExAllocatePoolWithTag( NonPagedPool, sizeof(*Bucket), 
TDI_BUCKET_TAG );
-    if( !Bucket )
-    {
-        TI_DbgPrint(DEBUG_TCP,("[IP, TCPReceiveData] Failed to allocate 
bucket\n"));
+    NdisQueryBuffer(Buffer, &DataBuffer, &DataLen);
+    
+    Status = LibTCPGetDataFromConnectionQueue(Connection, DataBuffer, DataLen, 
&Received);
+
+    if (Status == STATUS_PENDING)
+    {
+        LockObject(Connection, &OldIrql);
+    
+        /* Freed in TCPSocketState */
+        Bucket = ExAllocatePoolWithTag( NonPagedPool, sizeof(*Bucket), 
TDI_BUCKET_TAG );
+        if( !Bucket )
+        {
+            TI_DbgPrint(DEBUG_TCP,("[IP, TCPReceiveData] Failed to allocate 
bucket\n"));
+            UnlockObject(Connection, OldIrql);
+
+            return STATUS_NO_MEMORY;
+        }
+    
+        Bucket->Request.RequestNotifyObject = Complete;
+        Bucket->Request.RequestContext = Context;
+        *BytesReceived = 0;
+    
+        InsertTailList( &Connection->ReceiveRequest, &Bucket->Entry );
+        TI_DbgPrint(DEBUG_TCP,("[IP, TCPReceiveData] Queued read irp\n"));
+
         UnlockObject(Connection, OldIrql);
 
-        return STATUS_NO_MEMORY;
-    }
-    
-    Bucket->Request.RequestNotifyObject = Complete;
-    Bucket->Request.RequestContext = Context;
-    *BytesReceived = 0;
-    
-    InsertTailList( &Connection->ReceiveRequest, &Bucket->Entry );
-    TI_DbgPrint(DEBUG_TCP,("[IP, TCPReceiveData] Queued read irp\n"));
-
-    UnlockObject(Connection, OldIrql);
-
-    TI_DbgPrint(DEBUG_TCP,("[IP, TCPReceiveData] Leaving. Status = 
STATUS_PENDING\n"));
-    DbgPrint("[IP, TCPReceiveData] Leaving. Status = STATUS_PENDING\n");
+        TI_DbgPrint(DEBUG_TCP,("[IP, TCPReceiveData] Leaving. Status = 
STATUS_PENDING\n"));
+    }
+
+    DbgPrint("[IP, TCPReceiveData] Leaving. Status = %s\n",
+        Status == STATUS_PENDING? "STATUS_PENDING" : "STATUS_SUCCESS");
 
     return STATUS_PENDING;
 }

Modified: branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/include/rosip.h
URL: 
http://svn.reactos.org/svn/reactos/branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/include/rosip.h?rev=52864&r1=52863&r2=52864&view=diff
==============================================================================
--- branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/include/rosip.h 
[iso-8859-1] (original)
+++ branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/include/rosip.h 
[iso-8859-1] Mon Jul 25 18:45:59 2011
@@ -6,7 +6,19 @@
 #include "lwip/ip_addr.h"
 #include "tcpip.h"
 
+#ifndef LWIP_TAG
+    #define LWIP_TAG 'PIwl'
+#endif
+
 typedef struct tcp_pcb* PTCP_PCB;
+
+typedef struct _QUEUE_ENTRY
+{
+    struct pbuf *p;
+    LIST_ENTRY ListEntry;
+} QUEUE_ENTRY, *PQUEUE_ENTRY;
+
+NTSTATUS    LibTCPGetDataFromConnectionQueue(PCONNECTION_ENDPOINT Connection, 
PUCHAR RecvBuffer, UINT RecvLen, UINT *Received);
 
 /* External TCP event handlers */
 extern void TCPConnectEventHandler(void *arg, const err_t err);

Modified: branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/rostcp.c
URL: 
http://svn.reactos.org/svn/reactos/branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/rostcp.c?rev=52864&r1=52863&r2=52864&view=diff
==============================================================================
--- branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/rostcp.c [iso-8859-1] 
(original)
+++ branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/rostcp.c [iso-8859-1] 
Mon Jul 25 18:45:59 2011
@@ -30,6 +30,93 @@
 extern KEVENT TerminationEvent;
 
 static
+void
+LibTCPEmptyQueue(PCONNECTION_ENDPOINT Connection)
+{
+    PLIST_ENTRY Entry;
+    PQUEUE_ENTRY qp = NULL;
+
+    ReferenceObject(Connection);
+
+    
+    while (!IsListEmpty(&Connection->PacketQueue))
+    {
+        DbgPrint("[lwIP, LibTCPEmptyQueue] Removed packet off queue++++\n");
+
+        Entry = RemoveHeadList(&Connection->PacketQueue);
+        qp = CONTAINING_RECORD(Entry, QUEUE_ENTRY, ListEntry);
+        
+        // reenable this later
+        //pbuf_free(qp->p);
+
+        ExFreePoolWithTag(qp, LWIP_TAG);
+    }
+
+    DereferenceObject(Connection);
+}
+
+void LibTCPEnqueuePacket(PCONNECTION_ENDPOINT Connection, struct pbuf *p)
+{
+    PQUEUE_ENTRY qp;
+
+    qp = (PQUEUE_ENTRY)ExAllocatePoolWithTag(NonPagedPool, 
sizeof(QUEUE_ENTRY), LWIP_TAG);
+    qp->p = p;
+
+    ExInterlockedInsertTailList(&Connection->PacketQueue, &qp->ListEntry, 
&Connection->Lock);
+}
+
+PQUEUE_ENTRY LibTCPDequeuePacket(PCONNECTION_ENDPOINT Connection)
+{
+    PLIST_ENTRY Entry;
+    PQUEUE_ENTRY qp = NULL;
+
+    Entry = ExInterlockedRemoveHeadList(&Connection->PacketQueue, 
&Connection->Lock);
+    
+    qp = CONTAINING_RECORD(Entry, QUEUE_ENTRY, ListEntry);
+
+    return qp;
+}
+
+NTSTATUS LibTCPGetDataFromConnectionQueue(PCONNECTION_ENDPOINT Connection, 
PUCHAR RecvBuffer, UINT RecvLen, UINT *Received)
+{
+    PQUEUE_ENTRY qp;
+    struct pbuf* p;
+    NTSTATUS Status = STATUS_PENDING;
+
+    if (!IsListEmpty(&Connection->PacketQueue))
+    {
+        DbgPrint("[lwIP, LibTCPGetDataFromConnectionQueue] Getting packet off 
the queue\n");
+
+        qp = LibTCPDequeuePacket(Connection);
+        p = qp->p;
+
+        RecvLen = MIN(p->tot_len, RecvLen);
+
+        for ((*Received) = 0; (*Received) < RecvLen; *Received += p->len, p = 
p->next)
+        {
+            DbgPrint("[lwIP, LibTCPGetDataFromConnectionQueue] 0x%x: Copying 
%d bytes to 0x%x from 0x%x\n",
+                p, p->len, ((PUCHAR)RecvBuffer) + (*Received), p->payload);
+            
+            RtlCopyMemory(RecvBuffer + (*Received), p->payload, p->len);
+        }
+
+        // reenable this later
+        //pbuf_free(qp->p);
+        ExFreePoolWithTag(qp, LWIP_TAG);
+
+        Status = STATUS_SUCCESS;
+    }
+    else
+    {
+        DbgPrint("[lwIP, LibTCPGetPacketFromQueue] Queue is EMPTY\n");
+
+        Status = STATUS_PENDING;
+    }
+
+    return Status;
+}
+
+static
 BOOLEAN
 WaitForEventSafely(PRKEVENT Event)
 {
@@ -56,7 +143,7 @@
 
 static
 err_t
-InternalSendEventHandler(void *arg, struct tcp_pcb *pcb, const u16_t space)
+InternalSendEventHandler(void *arg, PTCP_PCB pcb, const u16_t space)
 {
     DbgPrint("[lwIP, InternalSendEventHandler] SendEvent (0x%x, 0x%x, %d)\n",
         arg, pcb, (unsigned int)space);
@@ -75,7 +162,7 @@
 
 static
 err_t
-InternalRecvEventHandler(void *arg, struct tcp_pcb *pcb, struct pbuf *p, const 
err_t err)
+InternalRecvEventHandler(void *arg, PTCP_PCB pcb, struct pbuf *p, const err_t 
err)
 {
     u32_t len;
 
@@ -125,8 +212,13 @@
         else
         {
             /* We want lwIP to store the pbuf on its queue for later */
-            DbgPrint("[lwIP, InternalRecvEventHandler] Done ERR_TIMEOUT\n");
-            return ERR_TIMEOUT;
+            DbgPrint("[lwIP, InternalRecvEventHandler] Done ERR_TIMEOUT 
queuing pbuf\n");
+
+            LibTCPEnqueuePacket((PCONNECTION_ENDPOINT)arg, p);
+
+            tcp_recved(pcb, p->tot_len);
+
+            return ERR_OK;//ERR_TIMEOUT;//
         }
     }
     else if (err == ERR_OK)
@@ -145,7 +237,7 @@
 
 static
 err_t
-InternalAcceptEventHandler(void *arg, struct tcp_pcb *newpcb, const err_t err)
+InternalAcceptEventHandler(void *arg, PTCP_PCB newpcb, const err_t err)
 {
     DbgPrint("[lwIP, InternalAcceptEventHandler] AcceptEvent arg = 0x%x, 
newpcb = 0x%x, err = %d\n",
         arg, newpcb, (unsigned int)err);
@@ -168,7 +260,7 @@
 
 static
 err_t
-InternalConnectEventHandler(void *arg, struct tcp_pcb *pcb, const err_t err)
+InternalConnectEventHandler(void *arg, PTCP_PCB pcb, const err_t err)
 {
     DbgPrint("[lwIP, InternalConnectEventHandler] ConnectEvent (0x%x, pcb = 
0x%x, err = %d)\n",
         arg, pcb, (unsigned int)err);
@@ -686,8 +778,8 @@
     if (pcb->state != LISTEN)
     {
         tcp_recv(pcb, NULL);
-        //tcp_sent(pcb, NULL);
-        //tcp_err(pcb, NULL);
+        tcp_sent(pcb, NULL);
+        tcp_err(pcb, NULL);
     }
 
     tcp_accept(pcb, NULL);
@@ -705,13 +797,15 @@
     {
         DbgPrint("[lwIP, LibTCPCloseCallback] NULL pcb...bail, bail!!!\n");
         
-        ASSERT(FALSE);
+        //ASSERT(FALSE);
 
         msg->Error = ERR_OK;
         return;
     }
 
     CloseCallbacks((PTCP_PCB)msg->Connection->SocketContext);
+
+    LibTCPEmptyQueue(msg->Connection);
 
     if (((PTCP_PCB)msg->Connection->SocketContext)->state == LISTEN)
     {
@@ -752,6 +846,9 @@
     if (safe)
     {
         CloseCallbacks((PTCP_PCB)Connection->SocketContext);
+        
+        LibTCPEmptyQueue(Connection);
+
         if ( ((PTCP_PCB)Connection->SocketContext)->state == LISTEN )
         {
             DbgPrint("[lwIP, LibTCPClose] Closing a listener\n");


Reply via email to