Author: rhs Date: Thu Feb 5 06:41:54 2009 New Revision: 741024 URL: http://svn.apache.org/viewvc?rev=741024&view=rev Log: QPID-1646: implemented handlers for producer flow control signals
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java?rev=741024&r1=741023&r2=741024&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java Thu Feb 5 06:41:54 2009 @@ -22,8 +22,9 @@ import static org.apache.qpid.transport.Connection.State.OPEN; -import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; @@ -63,11 +64,14 @@ @Override public void connectionStart(Connection conn, ConnectionStart start) { + Map clientProperties = new HashMap(); + clientProperties.put("qpid.session_flow", 1); + List<Object> mechanisms = start.getMechanisms(); if (mechanisms == null || mechanisms.isEmpty()) { conn.connectionStartOk - (Collections.EMPTY_MAP, null, null, conn.getLocale()); + (clientProperties, null, null, conn.getLocale()); return; } @@ -86,7 +90,7 @@ byte[] response = sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null; conn.connectionStartOk - (Collections.EMPTY_MAP, sc.getMechanismName(), response, + (clientProperties, sc.getMechanismName(), response, conn.getLocale()); } catch (SaslException e) @@ -132,7 +136,7 @@ { throw new UnsupportedOperationException(); } - + /** * Currently the spec specified the min and max for heartbeat using secs */ Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=741024&r1=741023&r2=741024&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Thu Feb 5 06:41:54 2009 @@ -33,6 +33,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import static org.apache.qpid.transport.Option.*; import static org.apache.qpid.transport.Session.State.*; @@ -99,6 +101,10 @@ private State state = NEW; + // transfer flow control + private volatile boolean flowControl = false; + private Semaphore credit = new Semaphore(0); + Session(Connection connection, Binary name, long expiry) { this.connection = connection; @@ -166,6 +172,41 @@ } } + void setFlowControl(boolean value) + { + flowControl = value; + } + + void addCredit(int value) + { + credit.release(value); + } + + void drainCredit() + { + credit.drainPermits(); + } + + void acquireCredit() + { + if (flowControl) + { + try + { + if (!credit.tryAcquire(timeout, TimeUnit.MILLISECONDS)) + { + throw new SessionException + ("timed out waiting for message credit"); + } + } + catch (InterruptedException e) + { + throw new SessionException + ("interrupted while waiting for credit", null, e); + } + } + } + private void initReceiver() { synchronized (processedLock) @@ -428,6 +469,11 @@ { if (m.getEncodedTrack() == Frame.L4) { + if (m.hasPayload()) + { + acquireCredit(); + } + synchronized (commands) { if (state != OPEN && state != CLOSED) Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=741024&r1=741023&r2=741024&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java Thu Feb 5 06:41:54 2009 @@ -146,4 +146,42 @@ ssn.getSessionListener().message(ssn, xfr); } + @Override public void messageSetFlowMode(Session ssn, MessageSetFlowMode sfm) + { + if ("".equals(sfm.getDestination()) && + MessageFlowMode.CREDIT.equals(sfm.getFlowMode())) + { + ssn.setFlowControl(true); + } + else + { + super.messageSetFlowMode(ssn, sfm); + } + } + + @Override public void messageFlow(Session ssn, MessageFlow flow) + { + if ("".equals(flow.getDestination()) && + MessageCreditUnit.MESSAGE.equals(flow.getUnit())) + { + ssn.addCredit((int) flow.getValue()); + } + else + { + super.messageFlow(ssn, flow); + } + } + + @Override public void messageStop(Session ssn, MessageStop stop) + { + if ("".equals(stop.getDestination())) + { + ssn.drainCredit(); + } + else + { + super.messageStop(ssn, stop); + } + } + } --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org