ARTEMIS-569 add missing method

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e9733a62
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e9733a62
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e9733a62

Branch: refs/heads/master
Commit: e9733a6223601db306d719a8e2b456e6c0494feb
Parents: d73d6d2
Author: jbertram <[email protected]>
Authored: Mon Jun 27 16:41:59 2016 -0500
Committer: Clebert Suconic <[email protected]>
Committed: Mon Jun 27 18:23:45 2016 -0400

----------------------------------------------------------------------
 .../core/client/impl/ClientProducerImpl.java        |  6 +++---
 .../protocol/core/impl/ActiveMQSessionContext.java  | 16 ++++++++++++++++
 .../artemis/spi/core/remoting/SessionContext.java   |  7 +++++++
 3 files changed, 26 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9733a62/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index 99e593f..b0998b8 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -371,10 +371,10 @@ public class ClientProducerImpl implements 
ClientProducerInternal {
       context.open();
       try {
 
-         for (int pos = 0; pos < bodySize; ) {
+         for (long pos = 0; pos < bodySize; ) {
             final boolean lastChunk;
 
-            final int chunkLength = Math.min((int) (bodySize - pos), 
minLargeMessageSize);
+            final int chunkLength = (int) Math.min((bodySize - pos), (long) 
minLargeMessageSize);
 
             final ActiveMQBuffer bodyBuffer = 
ActiveMQBuffers.fixedBuffer(chunkLength);
 
@@ -385,7 +385,7 @@ public class ClientProducerImpl implements 
ClientProducerInternal {
             lastChunk = pos >= bodySize;
             SendAcknowledgementHandler messageHandler = lastChunk ? handler : 
null;
 
-            int creditsUsed = sessionContext.sendLargeMessageChunk(msgI, -1, 
sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), reconnectID, 
messageHandler);
+            int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, 
-1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler);
 
             credits.acquireCredits(creditsUsed);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9733a62/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 9f0edce..f49a22a 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -438,6 +438,22 @@ public class ActiveMQSessionContext extends SessionContext 
{
    }
 
    @Override
+   public int sendServerLargeMessageChunk(MessageInternal msgI, long 
messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, 
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
+      final boolean requiresResponse = lastChunk && sendBlocking;
+      final SessionSendContinuationMessage chunkPacket = new 
SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, 
messageBodySize, messageHandler);
+
+      if (requiresResponse) {
+         // When sending it blocking, only the last chunk will be blocking.
+         sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
+      }
+      else {
+         sessionChannel.send(chunkPacket);
+      }
+
+      return chunkPacket.getPacketSize();
+   }
+
+   @Override
    public void sendACK(boolean individual,
                        boolean block,
                        final ClientConsumer consumer,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9733a62/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index 774dbfe..175360c 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -150,6 +150,13 @@ public abstract class SessionContext {
                                              int reconnectID,
                                              SendAcknowledgementHandler 
messageHandler) throws ActiveMQException;
 
+   public abstract int sendServerLargeMessageChunk(MessageInternal msgI,
+                                                   long messageBodySize,
+                                                   boolean sendBlocking,
+                                                   boolean lastChunk,
+                                                   byte[] chunk,
+                                                   SendAcknowledgementHandler 
messageHandler) throws ActiveMQException;
+
    public abstract void setSendAcknowledgementHandler(final 
SendAcknowledgementHandler handler);
 
    public abstract void createSharedQueue(SimpleString address,

Reply via email to