Author: ravi
Date: Sun Aug 28 20:09:19 2016
New Revision: 1758147

URL: http://svn.apache.org/viewvc?rev=1758147&view=rev
Log:
Fix by Senduran for SYNAPSE-1015

Added:
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/ControlledByteBuffer.java
Modified:
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/Pipe.java
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetContext.java
    
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/BufferFactory.java

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/Pipe.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/Pipe.java?rev=1758147&r1=1758146&r2=1758147&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/Pipe.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/Pipe.java
 Sun Aug 28 20:09:19 2016
@@ -23,10 +23,9 @@ import org.apache.http.nio.IOControl;
 import org.apache.http.nio.ContentDecoder;
 import org.apache.http.nio.ContentEncoder;
 import org.apache.synapse.transport.passthru.config.BaseConfiguration;
+import org.apache.synapse.transport.passthru.util.ControlledByteBuffer;
 
 import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -43,9 +42,9 @@ public class Pipe {
     private IOControl consumerIoControl;
 
     /** Fixed size buffer to read and write data */
-    private ByteBuffer buffer;
+    private ControlledByteBuffer buffer;
 
-    private ByteBuffer outputBuffer;
+    private ControlledByteBuffer outputBuffer;
 
     private boolean producerCompleted = false;
 
@@ -70,13 +69,10 @@ public class Pipe {
 
        private boolean hasHttpProducer = true;
 
-    private AtomicBoolean inBufferInputMode = new AtomicBoolean(true);
-    private AtomicBoolean outBufferInputMode;
-
     private ByteBufferInputStream inputStream;
     private ByteBufferOutputStream outputStream;
 
-    public Pipe(IOControl producerIoControl, ByteBuffer buffer,
+    public Pipe(IOControl producerIoControl, ControlledByteBuffer buffer,
                 String name, BaseConfiguration baseConfig) {
         this.producerIoControl = producerIoControl;
         this.buffer = buffer;
@@ -84,7 +80,7 @@ public class Pipe {
         this.baseConfig = baseConfig;
     }
 
-    public Pipe(ByteBuffer buffer, String name, BaseConfiguration baseConfig) {
+    public Pipe(ControlledByteBuffer buffer, String name, BaseConfiguration 
baseConfig) {
         this.buffer = buffer;
         this.name += "_" + name;
         this.baseConfig = baseConfig;
@@ -118,14 +114,11 @@ public class Pipe {
         }
 
         lock.lock();
-        ByteBuffer consumerBuffer;
-        AtomicBoolean inputMode;
+        ControlledByteBuffer consumerBuffer;
         if (outputBuffer != null) {
             consumerBuffer = outputBuffer;
-            inputMode = outBufferInputMode;
         } else {
             consumerBuffer = buffer;
-            inputMode = inBufferInputMode;
         }
         try {
             // if producer at error we have to stop the encoding and return 
immediately
@@ -134,9 +127,9 @@ public class Pipe {
                 return -1;
             }
 
-            setOutputMode(consumerBuffer, inputMode);
-            int bytesWritten = encoder.write(consumerBuffer);
-            setInputMode(consumerBuffer, inputMode);
+            setOutputMode(consumerBuffer);
+            int bytesWritten = encoder.write(consumerBuffer.getByteBuffer());
+            setInputMode(consumerBuffer);
 
             if (consumerBuffer.position() == 0) {
                 if (outputBuffer == null) {
@@ -179,8 +172,8 @@ public class Pipe {
 
         lock.lock();
         try {
-            setInputMode(buffer, inBufferInputMode);
-            int bytesRead = decoder.read(buffer);
+            setInputMode(buffer);
+            int bytesRead = decoder.read(buffer.getByteBuffer());
 
             // if consumer is at error we have to let the producer complete
             if (consumerError) {
@@ -258,7 +251,6 @@ public class Pipe {
     public synchronized OutputStream getOutputStream() {
         if (outputStream == null) {
             outputBuffer = baseConfig.getBufferFactory().getBuffer();
-            outBufferInputMode = new AtomicBoolean(true);
             outputStream = new ByteBufferOutputStream();
         }
         return outputStream;
@@ -267,7 +259,7 @@ public class Pipe {
     public synchronized void setSerializationComplete(boolean 
serializationComplete) {
         if (!this.serializationComplete) {
             this.serializationComplete = serializationComplete;
-            if (consumerIoControl != null && hasData(outputBuffer, 
outBufferInputMode)) {
+            if (consumerIoControl != null && hasData(outputBuffer)) {
                 consumerIoControl.requestOutput();
             }
         }
@@ -284,7 +276,7 @@ public class Pipe {
        this.rawSerializationComplete = rawSerializationComplete;
     }
 
-       public ByteBuffer getBuffer() {
+       public ControlledByteBuffer getBuffer() {
         return buffer;
     }
 
@@ -292,8 +284,8 @@ public class Pipe {
         return serializationComplete;
     }
 
-    private void setInputMode(ByteBuffer buffer, AtomicBoolean inputMode) {
-        if (inputMode.compareAndSet(false, true)) {
+    private void setInputMode(ControlledByteBuffer buffer) {
+        if (buffer.setInputMode()) {
             if (buffer.hasRemaining()) {
                 buffer.compact();
             } else {
@@ -302,16 +294,16 @@ public class Pipe {
         }
     }
 
-    private void setOutputMode(ByteBuffer buffer, AtomicBoolean inputMode) {
-        if (inputMode.compareAndSet(true, false)) {
+    private void setOutputMode(ControlledByteBuffer buffer) {
+        if (buffer.setOutputMode()) {
             buffer.flip();
         }
     }
 
-    private boolean hasData(ByteBuffer buffer, AtomicBoolean inputMode) {
+    private boolean hasData(ControlledByteBuffer buffer) {
         lock.lock();
         try {
-            setOutputMode(buffer, inputMode);
+            setOutputMode(buffer);
             return buffer.hasRemaining();
         } finally {
             lock.unlock();
@@ -324,7 +316,7 @@ public class Pipe {
         public int read() throws IOException {
             lock.lock();
             try {
-                if (!hasData(buffer, inBufferInputMode)) {
+                if (!hasData(buffer)) {
                     waitForData();
                     if (producerError) {
                         return -1;
@@ -346,13 +338,13 @@ public class Pipe {
 
             lock.lock();
             try {
-                if (!hasData(buffer, inBufferInputMode)) {
+                if (!hasData(buffer)) {
                     waitForData();
                 }
                 if (isEndOfStream()) {
                     return -1;
                 }
-                setOutputMode(buffer, inBufferInputMode);
+                setOutputMode(buffer);
                 int chunk = len;
                 if (chunk > buffer.remaining()) {
                     chunk = buffer.remaining();
@@ -368,7 +360,7 @@ public class Pipe {
             lock.lock();
             try {
                 try {
-                    while (!hasData(buffer, inBufferInputMode) && 
!producerCompleted) {
+                    while (!hasData(buffer) && !producerCompleted) {
                         if (producerError) {
                             break;
                         }
@@ -384,7 +376,7 @@ public class Pipe {
         }
 
         private boolean isEndOfStream() {
-            return !hasData(buffer, inBufferInputMode) && producerCompleted;
+            return !hasData(buffer) && producerCompleted;
         }
     }
 
@@ -394,10 +386,10 @@ public class Pipe {
         public void write(int b) throws IOException {
             lock.lock();
             try {
-                setInputMode(outputBuffer, outBufferInputMode);
+                setInputMode(outputBuffer);
                 if (!outputBuffer.hasRemaining()) {
                     flushContent();
-                    setInputMode(outputBuffer, outBufferInputMode);
+                    setInputMode(outputBuffer);
                 }
                 outputBuffer.put((byte) b);
             } finally {
@@ -411,7 +403,7 @@ public class Pipe {
             }
             lock.lock();
             try {
-                setInputMode(outputBuffer, outBufferInputMode);
+                setInputMode(outputBuffer);
                 int remaining = len;
                 while (remaining > 0) {
                     if (!outputBuffer.hasRemaining()) {
@@ -420,7 +412,7 @@ public class Pipe {
                             buffer.clear();
                             break;
                         }
-                        setInputMode(outputBuffer, outBufferInputMode);
+                        setInputMode(outputBuffer);
                     }
                     int chunk = Math.min(remaining, outputBuffer.remaining());
                     outputBuffer.put(b, off, chunk);
@@ -441,7 +433,7 @@ public class Pipe {
             
             try {
                 try {
-                                       while (hasData(outputBuffer, 
outBufferInputMode)) {
+                                       while (hasData(outputBuffer)) {
                         if(consumerError) {
                             break;
                         }

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java?rev=1758147&r1=1758146&r2=1758147&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceContext.java
 Sun Aug 28 20:09:19 2016
@@ -21,6 +21,7 @@ package org.apache.synapse.transport.pas
 
 import org.apache.http.nio.NHttpConnection;
 import org.apache.synapse.transport.passthru.config.SourceConfiguration;
+import org.apache.synapse.transport.passthru.util.ControlledByteBuffer;
 
 import java.nio.ByteBuffer;
 
@@ -85,7 +86,7 @@ public class SourceContext {
         }
 
         if (writer != null) {
-            ByteBuffer buffer = writer.getBuffer();
+            ControlledByteBuffer buffer = writer.getBuffer();
             buffer.clear();
             sourceConfiguration.getBufferFactory().release(buffer);
         }

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetContext.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetContext.java?rev=1758147&r1=1758146&r2=1758147&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetContext.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetContext.java
 Sun Aug 28 20:09:19 2016
@@ -22,6 +22,7 @@ package org.apache.synapse.transport.pas
 import org.apache.axis2.context.MessageContext;
 import org.apache.http.nio.NHttpConnection;
 import org.apache.synapse.transport.passthru.config.TargetConfiguration;
+import org.apache.synapse.transport.passthru.util.ControlledByteBuffer;
 
 import java.nio.ByteBuffer;
 
@@ -114,7 +115,7 @@ public class TargetContext {
         }
 
         if (writer != null) {
-            ByteBuffer buffer = writer.getBuffer();
+            ControlledByteBuffer buffer = writer.getBuffer();
             buffer.clear();
             targetConfiguration.getBufferFactory().release(buffer);
         }

Modified: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/BufferFactory.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/BufferFactory.java?rev=1758147&r1=1758146&r2=1758147&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/BufferFactory.java
 (original)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/BufferFactory.java
 Sun Aug 28 20:09:19 2016
@@ -21,13 +21,12 @@ package org.apache.synapse.transport.pas
 import org.apache.http.nio.util.ByteBufferAllocator;
 import org.apache.http.nio.util.HeapByteBufferAllocator;
 
-import java.nio.ByteBuffer;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 public class BufferFactory {
 
-    private volatile ByteBuffer [] buffers;
+    private volatile ControlledByteBuffer [] buffers;
 
     private volatile int marker = -1;
 
@@ -45,34 +44,36 @@ public class BufferFactory {
             this.allocator = HeapByteBufferAllocator.INSTANCE;
         }
 
-        buffers = new ByteBuffer[size];
+        buffers = new ControlledByteBuffer[size];
     }
 
-    public ByteBuffer getBuffer() {
+    public ControlledByteBuffer getBuffer() {
         if (marker == -1) {
-            return allocator.allocate(bufferSize);
+            return new ControlledByteBuffer(allocator.allocate(bufferSize));
         } else {
             try {
                 lock.lock();
                 if (marker >= 0) {
-                    ByteBuffer b = buffers[marker];
-                    b.clear();
+                    ControlledByteBuffer controlledByteBuffer = 
buffers[marker];
+                    controlledByteBuffer.clear();
+                    controlledByteBuffer.forceSetInputMode();
                     buffers[marker] = null;
                     marker--;
-                    return b;
+                    return controlledByteBuffer;
                 }
             } finally {
                 lock.unlock();
             }
         }
-        return allocator.allocate(bufferSize);
+        return new ControlledByteBuffer(allocator.allocate(bufferSize));
     }
 
-    public void release(ByteBuffer buffer) {
+    public void release(ControlledByteBuffer buffer) {
         try {
             lock.lock();
             if (marker < buffers.length - 1) {
                 buffer.clear();
+                buffer.forceSetInputMode();
                 buffers[++marker] = buffer;
             }
         } finally {

Added: 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/ControlledByteBuffer.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/ControlledByteBuffer.java?rev=1758147&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/ControlledByteBuffer.java
 (added)
+++ 
synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/ControlledByteBuffer.java
 Sun Aug 28 20:09:19 2016
@@ -0,0 +1,93 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.transport.passthru.util;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ControlledByteBuffer {
+    private ByteBuffer byteBuffer;
+
+    private AtomicBoolean inputMode = new AtomicBoolean(true);
+
+    public ControlledByteBuffer(ByteBuffer byteBuffer) {
+        this.byteBuffer = byteBuffer;
+    }
+
+    public ByteBuffer getByteBuffer() {
+        return this.byteBuffer;
+    }
+
+    public boolean setInputMode() {
+        return this.inputMode.compareAndSet(false, true);
+    }
+
+    public boolean setOutputMode() {
+        return this.inputMode.compareAndSet(true, false);
+    }
+
+    public void forceSetInputMode() {
+        this.inputMode = new AtomicBoolean(true);
+    }
+
+    public void flip() {
+        this.byteBuffer.flip();
+    }
+
+    public void clear() {
+        this.byteBuffer.clear();
+    }
+
+    public void compact() {
+        this.byteBuffer.compact();
+    }
+
+    public int position() {
+        return this.byteBuffer.position();
+    }
+
+    public void put(byte b) {
+        this.byteBuffer.put(b);
+    }
+
+    public void putInt(int value) {
+        this.byteBuffer.putInt(value);
+    }
+
+    public ByteBuffer put(byte[] src, int offset, int length) {
+        return this.byteBuffer.put(src, offset, length);
+    }
+
+    public boolean hasRemaining() {
+        return this.byteBuffer.hasRemaining();
+    }
+
+    public byte get() {
+        return this.byteBuffer.get();
+    }
+
+    public ByteBuffer get(byte[] dst, int offset, int length) {
+        return this.byteBuffer.get(dst, offset, length);
+    }
+
+    public int remaining() {
+        return this.byteBuffer.remaining();
+    }
+}


Reply via email to