Author: rhs
Date: Tue May  1 20:56:28 2012
New Revision: 1332815

URL: http://svn.apache.org/viewvc?rev=1332815&view=rev
Log:
fixed bugs in socket shutdown; improved transport interface

Modified:
    qpid/proton/trunk/proton-c/include/proton/engine.h
    qpid/proton/trunk/proton-c/src/driver.c
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/proton-c/tests/engine.py

Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1332815&r1=1332814&r2=1332815&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Tue May  1 20:56:28 2012
@@ -92,14 +92,11 @@ void pn_connection_close(pn_connection_t
 void pn_connection_destroy(pn_connection_t *connection);
 
 // transport
-pn_state_t pn_transport_state(pn_transport_t *transport);
 pn_error_t *pn_transport_error(pn_transport_t *transport);
 ssize_t pn_input(pn_transport_t *transport, char *bytes, size_t available);
 ssize_t pn_output(pn_transport_t *transport, char *bytes, size_t size);
 time_t pn_tick(pn_transport_t *transport, time_t now);
 void pn_trace(pn_transport_t *transport, pn_trace_t trace);
-void pn_transport_open(pn_transport_t *transport);
-void pn_transport_close(pn_transport_t *transport);
 void pn_transport_destroy(pn_transport_t *transport);
 
 // session

Modified: qpid/proton/trunk/proton-c/src/driver.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/driver.c?rev=1332815&r1=1332814&r2=1332815&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/driver.c Tue May  1 20:56:28 2012
@@ -398,7 +398,7 @@ static void pn_connector_read(pn_connect
 {
   ssize_t n = recv(ctor->fd, ctor->input + ctor->input_size, IO_BUF_SIZE - 
ctor->input_size, 0);
   if (n <= 0) {
-    printf("disconnected: %zi\n", n);
+    if (n < 0) perror("read");
     ctor->status &= ~PN_SEL_RD;
     ctor->input_eos = true;
   } else {
@@ -421,11 +421,12 @@ static void pn_connector_process_input(p
     } else if (n == 0) {
       break;
     } else {
-      if (n != PN_EOS) {
+      if (n == PN_EOS) {
+        pn_connector_consume(ctor, ctor->input_size);
+      } else {
         printf("error in process_input: %zi\n", n);
       }
       ctor->input_done = true;
-      ctor->output_done = true;
       break;
     }
   }
@@ -438,6 +439,7 @@ static ssize_t pn_connector_read_sasl_he
       fprintf(stderr, "sasl header missmatch: ");
       pn_fprint_data(stderr, ctor->input, ctor->input_size);
       fprintf(stderr, "\n");
+      ctor->output_done = true;
       return PN_ERR;
     } else {
       fprintf(stderr, "    <- AMQP SASL 1.0\n");
@@ -448,6 +450,7 @@ static ssize_t pn_connector_read_sasl_he
     fprintf(stderr, "sasl header missmatch: ");
     pn_fprint_data(stderr, ctor->input, ctor->input_size);
     fprintf(stderr, "\n");
+    ctor->output_done = true;
     return PN_ERR;
   }
 
@@ -473,6 +476,7 @@ static ssize_t pn_connector_read_amqp_he
       fprintf(stderr, "amqp header missmatch: ");
       pn_fprint_data(stderr, ctor->input, ctor->input_size);
       fprintf(stderr, "\n");
+      ctor->output_done = true;
       return PN_ERR;
     } else {
       fprintf(stderr, "    <- AMQP 1.0\n");
@@ -483,6 +487,7 @@ static ssize_t pn_connector_read_amqp_he
     fprintf(stderr, "amqp header missmatch: ");
     pn_fprint_data(stderr, ctor->input, ctor->input_size);
     fprintf(stderr, "\n");
+    ctor->output_done = true;
     return PN_ERR;
   }
 
@@ -492,13 +497,7 @@ static ssize_t pn_connector_read_amqp_he
 static ssize_t pn_connector_read_amqp(pn_connector_t *ctor)
 {
   pn_transport_t *transport = ctor->transport;
-  size_t n = 0;
-  if (ctor->input_size) {
-    n = pn_input(transport, ctor->input, ctor->input_size);
-  } else if (ctor->input_eos) {
-    ctor->input_done = true;
-  }
-  return n;
+  return pn_input(transport, ctor->input, ctor->input_size);
 }
 
 static char *pn_connector_output(pn_connector_t *ctor)
@@ -524,7 +523,6 @@ static void pn_connector_process_output(
         fprintf(stderr, "error in process_output: %zi\n", n);
       }
       ctor->output_done = true;
-      ctor->input_done = true;
       break;
     }
   }
@@ -578,7 +576,6 @@ static ssize_t pn_connector_write_amqp_h
   fprintf(stderr, "    -> AMQP 1.0\n");
   memmove(pn_connector_output(ctor), "AMQP\x00\x01\x00\x00", 8);
   ctor->process_output = pn_connector_write_amqp;
-  pn_transport_open(ctor->transport);
   return 8;
 }
 
@@ -618,6 +615,7 @@ void pn_connector_process(pn_connector_t
       d->fds[idx].revents &= ~POLLOUT;
     }
     if (c->output_size == 0 && c->input_done && c->output_done) {
+      fprintf(stderr, "closed\n");
       pn_connector_close(c);
     }
   }

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1332815&r1=1332814&r2=1332815&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Tue May  1 20:56:28 
2012
@@ -101,7 +101,9 @@ struct pn_transport_t {
   pn_connection_t *connection;
   pn_dispatcher_t *disp;
   bool open_sent;
+  bool open_rcvd;
   bool close_sent;
+  bool close_rcvd;
   pn_session_state_t *sessions;
   size_t session_capacity;
   pn_session_state_t **channels;

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1332815&r1=1332814&r2=1332815&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Tue May  1 20:56:28 2012
@@ -662,7 +662,9 @@ void pn_transport_init(pn_transport_t *t
   pn_dispatcher_action(transport->disp, CLOSE, "CLOSE", pn_do_close);
 
   transport->open_sent = false;
+  transport->open_rcvd = false;
   transport->close_sent = false;
+  transport->close_rcvd = false;
 
   transport->sessions = NULL;
   transport->session_capacity = 0;
@@ -717,11 +719,6 @@ pn_transport_t *pn_transport(pn_connecti
   }
 }
 
-pn_state_t pn_transport_state(pn_transport_t *transport)
-{
-  return transport->endpoint.state;
-}
-
 pn_error_t *pn_transport_error(pn_transport_t *transport)
 {
   return &transport->endpoint.error;
@@ -1018,12 +1015,12 @@ void pn_do_error(pn_transport_t *transpo
   // XXX: result
   vsnprintf(transport->endpoint.error.description, DESCRIPTION, fmt, ap);
   va_end(ap);
-  fprintf(stderr, "ERROR %s %s\n", condition, 
transport->endpoint.error.description);
-  if (!transport->close_sent)
+  if (!transport->close_sent) {
     pn_post_close(transport);
-  PN_SET_LOCAL(transport->endpoint.state, PN_LOCAL_CLOSED);
-  PN_SET_REMOTE(transport->endpoint.state, PN_REMOTE_CLOSED);
+    transport->close_sent = true;
+  }
   transport->disp->halt = true;
+  fprintf(stderr, "ERROR %s %s\n", condition, 
transport->endpoint.error.description);
 }
 
 void pn_do_open(pn_dispatcher_t *disp)
@@ -1286,26 +1283,30 @@ void pn_do_end(pn_dispatcher_t *disp)
 void pn_do_close(pn_dispatcher_t *disp)
 {
   pn_transport_t *transport = disp->context;
+  transport->close_rcvd = true;
   PN_SET_REMOTE(transport->connection->endpoint.state, PN_REMOTE_CLOSED);
-  PN_SET_REMOTE(transport->endpoint.state, PN_REMOTE_CLOSED);
 }
 
 ssize_t pn_input(pn_transport_t *transport, char *bytes, size_t available)
 {
-  if (transport->endpoint.state & PN_LOCAL_UNINIT) {
-    return 0;
-  }
-
-  if (transport->endpoint.state & PN_LOCAL_CLOSED) {
-    return PN_EOS;
+  if (!available) {
+    pn_do_error(transport, "amqp:connection:framing-error", "connection 
aborted");
+    fprintf(stderr, "    <- EOS\n");
+    return PN_ERR;
   }
 
-  if (transport->endpoint.state & PN_REMOTE_CLOSED) {
+  if (transport->close_rcvd) {
     pn_do_error(transport, "amqp:connection:framing-error", "data after 
close");
+    fprintf(stderr, "    <- EOS\n");
     return PN_ERR;
   }
 
-  return pn_dispatcher_input(transport->disp, bytes, available);
+  ssize_t n = pn_dispatcher_input(transport->disp, bytes, available);
+  if (n >= 0 && transport->close_rcvd) {
+    fprintf(stderr, "    <- EOS\n");
+    return PN_EOS;
+  }
+  return n;
 }
 
 void pn_process_conn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint)
@@ -1645,11 +1646,10 @@ void pn_process(pn_transport_t *transpor
 
 ssize_t pn_output(pn_transport_t *transport, char *bytes, size_t size)
 {
-  if (!(transport->endpoint.state & PN_LOCAL_UNINIT)) {
-    pn_process(transport);
-  }
+  pn_process(transport);
 
-  if (!transport->disp->available && transport->endpoint.state & 
PN_LOCAL_CLOSED) {
+  if (!transport->disp->available && transport->close_sent) {
+    fprintf(stderr, "    -> EOS\n");
     return PN_EOS;
   }
 

Modified: qpid/proton/trunk/proton-c/tests/engine.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/tests/engine.py?rev=1332815&r1=1332814&r2=1332815&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/tests/engine.py (original)
+++ qpid/proton/trunk/proton-c/tests/engine.py Tue May  1 20:56:28 2012
@@ -27,17 +27,17 @@ from cproton import *
 def pump(t1, t2):
   while True:
     cd, out1 = pn_output(t1, 1024)
-    assert cd >= 0, cd
+    assert cd >= 0 or cd == PN_EOS, (cd, out1)
     cd, out2 = pn_output(t2, 1024)
-    assert cd >= 0, cd
+    assert cd >= 0 or cd == PN_EOS, (cd, out2)
 
     if out1 or out2:
       if out1:
         cd = pn_input(t2, out1)
-        assert cd == len(out1), (cd, out1)
+        assert cd == PN_EOS or cd == len(out1), (cd, out1)
       if out2:
         cd = pn_input(t1, out2)
-        assert cd == len(out2), (cd, out2)
+        assert cd == PN_EOS or cd == len(out2), (cd, out2)
     else:
       return
 
@@ -51,8 +51,6 @@ class Test:
     self.c2 = pn_connection()
     self.t1 = pn_transport(self.c1)
     self.t2 = pn_transport(self.c2)
-    pn_transport_open(self.t1)
-    pn_transport_open(self.t2)
     trc = os.environ.get("PN_TRACE_FRM")
     if trc and trc.lower() in ("1", "2", "yes", "true"):
       pn_trace(self.t1, PN_TRACE_FRM)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to