Hi Azeez,

Please find the below sample code.

public class Test {

    public static void main(String[] args) {
        Thread t = new Thread(new Test.ContentFiller());
        t.start();
    }

    static class ContentFiller implements Runnable {

        @Override
        public void run() {
            CarbonMessage carbonMessage = new DefaultCarbonMessage();
            String val = "This is a test";
            byte[] array = val.getBytes();
            Thread t = new Thread(new ContentReader(carbonMessage));
            t.start();
            for (int i = 0; i < 1000; i++) {

                ByteBuffer byteBuffer = ByteBuffer.allocate(array.length);
                byteBuffer.put(array);
                System.out.println("Adding content");
                carbonMessage.addMessageBody(byteBuffer);

            }
            carbonMessage.setEndOfMsgAdded(true);
        }
    }

    static class ContentReader implements Runnable {

        CarbonMessage carbonMessage;

        ContentReader(CarbonMessage carbonMessage) {
            this.carbonMessage = carbonMessage;
        }

        @Override
        public void run() {
            while (true) {
                if (carbonMessage.isEmpty() &&
carbonMessage.isEndOfMsgAdded()) {
                    break;
                } else {
                    ByteBuffer byteBuffer = carbonMessage.getMessageBody();
                    System.out.println("Content Reading " + new
String(byteBuffer.array()));
                }
            }
        }
    }
}

On Thu, Jun 2, 2016 at 9:55 PM, Afkham Azeez <az...@wso2.com> wrote:

> This CarbonMessage API is not intuitive. Please share code samples that
> demonstrate streaming input handling with CarbonMessage (request).
>
> On Thu, Jun 2, 2016 at 9:52 PM, Samiyuru Senarathne <samiy...@wso2.com>
> wrote:
>
>> I assume isEndOfMsgAdded only tell that all the chunks are loaded to the
>> blocking queue. So still we have to complete the total blocking queue.
>>
>> On Thu, Jun 2, 2016 at 9:50 PM, Afkham Azeez <az...@wso2.com> wrote:
>>
>>> while(!carbonMsg.isEndOfMsgAdded) {
>>>   ByteBuffer chunk = carbonMsg.getMessageBody();
>>>
>>> }
>>>
>>> In the above segment, carbonMsg is the request CarbonMessage and the
>>> above code segment doesn't work. Code inside the while loop never executes.
>>>
>>>
>>> On Thu, Jun 2, 2016 at 9:47 PM, Isuru Ranawaka <isu...@wso2.com> wrote:
>>>
>>>> Hi Azeez,
>>>>  yes that should work. Anyhow content needs to be added to carbonMsg
>>>> from one thread and read to be from another thread.
>>>>
>>>> thanks
>>>>
>>>> On Thu, Jun 2, 2016 at 9:23 PM, Afkham Azeez <az...@wso2.com> wrote:
>>>>
>>>>> Is the following code segment the way to read all the chunks;
>>>>>
>>>>> while(!carbonMsg.isEndOfMsgAdded) {
>>>>>   ByteBuffer chunk = carbonMsg.getMessageBody();
>>>>>
>>>>> }
>>>>>
>>>>> On Thu, Jun 2, 2016 at 9:21 PM, Afkham Azeez <az...@wso2.com> wrote:
>>>>>
>>>>>> Looks like we require changes for input streaming as well.
>>>>>>
>>>>>> Does CarbonMessage.getMessageBody() block until the next chunk is
>>>>>> received?
>>>>>>
>>>>>> On Thu, Jun 2, 2016 at 9:18 PM, Afkham Azeez <az...@wso2.com> wrote:
>>>>>>
>>>>>>> Samiyuru/Isuru,
>>>>>>> Does this mean that the input streaming in MSF4J is currently
>>>>>>> working without any issue and only output streaming requires some work?
>>>>>>>
>>>>>>> On Tue, May 3, 2016 at 2:29 PM, Isuru Ranawaka <isu...@wso2.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> Following are the details on streaming support of carbon transport.
>>>>>>>> Basically we can looking to streaming support in request path and 
>>>>>>>> response
>>>>>>>> path separately.
>>>>>>>>
>>>>>>>> Request Path (Streaming is working)
>>>>>>>>
>>>>>>>> [image: requestpath.png]
>>>>>>>>
>>>>>>>> According to above diagram
>>>>>>>>
>>>>>>>>    -
>>>>>>>>
>>>>>>>>    We have blocking queue in the carbon message which keeps the
>>>>>>>>    content . when headers are received through Netty worker thread it 
>>>>>>>> will
>>>>>>>>    create CarbonMessage with a blocking queue and publish carbon 
>>>>>>>> message to
>>>>>>>>    engine level thread.
>>>>>>>>    -
>>>>>>>>
>>>>>>>>    Reference for blocking queue is cached in the connection and
>>>>>>>>    when content is received it will be filled to that queue from Netty 
>>>>>>>> worker
>>>>>>>>    thread.
>>>>>>>>    -
>>>>>>>>
>>>>>>>>    Meanwhile engine level threads can consume content through
>>>>>>>>    queue(While IO worker is filling ) and can directly send to file 
>>>>>>>> system or
>>>>>>>>    use sender for send messages to external  service.
>>>>>>>>    -
>>>>>>>>
>>>>>>>>    According to that streaming should work in Request path in
>>>>>>>>    Integration Server or MSF4J without any problem.
>>>>>>>>
>>>>>>>>
>>>>>>>> Response Path (Streaming working scenario)
>>>>>>>>
>>>>>>>> [image: responsepathworking.png]
>>>>>>>>
>>>>>>>>
>>>>>>>> In Response path basic difference we have is we are handling
>>>>>>>> responses through callbacks.
>>>>>>>>
>>>>>>>>
>>>>>>>>    -
>>>>>>>>
>>>>>>>>    Similar to  Request path architecture  Sender side IO thread
>>>>>>>>     creates a CM when response headers are received and publish to 
>>>>>>>> engine
>>>>>>>>    level thread .
>>>>>>>>    -
>>>>>>>>
>>>>>>>>    Engine level thread calls carbonCallback.done() and waits on
>>>>>>>>    queue for content.
>>>>>>>>    -
>>>>>>>>
>>>>>>>>    Meanwhile IO thread writes the content to the Queue .
>>>>>>>>    -
>>>>>>>>
>>>>>>>>    So writing to Queue and reading from queue happens parallel and
>>>>>>>>    streaming should work properly.
>>>>>>>>    -
>>>>>>>>
>>>>>>>>    So streaming is working fine with Integration Server for this
>>>>>>>>    kind of scenarios.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Response Path (Streaming not working scenario)
>>>>>>>>
>>>>>>>> [image: responsepathnotworking.png]
>>>>>>>>
>>>>>>>> This scenario is basically, if we have a Echo mediator or assume
>>>>>>>> MSF4J has a service which is  reading a file as chunks and writes back 
>>>>>>>> to
>>>>>>>> client .
>>>>>>>>
>>>>>>>>
>>>>>>>>    -
>>>>>>>>
>>>>>>>>    Basic difference with the previous one  is in this approach
>>>>>>>>    reading a file chunk  or stream and writing that file chunk or 
>>>>>>>> stream to
>>>>>>>>    Queue is happened  within the same engine level thread as well as 
>>>>>>>> with
>>>>>>>>    reading from queue and writing to Listener side Netty worker 
>>>>>>>> threads.(Same
>>>>>>>>    thread produce and consume causes for deadlock)
>>>>>>>>    -
>>>>>>>>
>>>>>>>>    In the previous example reading from stream and filling the
>>>>>>>>    queue was happened  through Sender side IO thread and consuming the 
>>>>>>>> queue
>>>>>>>>    and writing to Listener side  IO thread was happened through engine 
>>>>>>>> level
>>>>>>>>    thread. (Two different threads produce and consume)
>>>>>>>>    -
>>>>>>>>
>>>>>>>>    Streaming is not working in this kind of scenarios because we
>>>>>>>>    cannot write to queue and keep polling the queue within single 
>>>>>>>> thread.
>>>>>>>>
>>>>>>>>
>>>>>>>> We can figure out following solutions and what will be the most
>>>>>>>> suitable one.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>    -
>>>>>>>>
>>>>>>>>    Introduce another thread for run the callback logic instead of
>>>>>>>>    running through calling thread.
>>>>>>>>
>>>>>>>>                 This will solve the above streaming problem but
>>>>>>>> when it comes to  general message flow this will add another level of
>>>>>>>> thread which will actually does not need.
>>>>>>>>
>>>>>>>>    -
>>>>>>>>
>>>>>>>>    Introduce another method in ResponseCallback which will support
>>>>>>>>    streaming which does not queuing contents.  it will directly call IO
>>>>>>>>    threads and write contents to IO when chunks are received. This can 
>>>>>>>> be used
>>>>>>>>    only within single thread scenario(File reading and writing ).
>>>>>>>>    -
>>>>>>>>
>>>>>>>>    Introduce another thread for read File stream or call Callback
>>>>>>>>    from another thread other than to reading thread  from MSF4J 
>>>>>>>> level.Then it
>>>>>>>>    will be equivalent to how we used it in Integration Server with the 
>>>>>>>> use of
>>>>>>>>    Sender.
>>>>>>>>
>>>>>>>> ThankYou
>>>>>>>>
>>>>>>>> IsuruR
>>>>>>>>
>>>>>>>> On Tue, May 3, 2016 at 12:09 PM, Kasun Indrasiri <ka...@wso2.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Had a chat with Ranawaka on this.. seems like we do have couple of
>>>>>>>>> ways to handle this. He will share the details.
>>>>>>>>>
>>>>>>>>> On Mon, May 2, 2016 at 6:13 AM, Afkham Azeez <az...@wso2.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> The problem is, we can't do the next MSF4J release because we
>>>>>>>>>> can't lose a feature in a release. This is a blocker for us. We have 
>>>>>>>>>> plans
>>>>>>>>>> to release in the next 2 weeks.
>>>>>>>>>>
>>>>>>>>>> On Mon, May 2, 2016 at 4:56 PM, Isuru Ranawaka <isu...@wso2.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Azeez,
>>>>>>>>>>>
>>>>>>>>>>> Currently streaming works if we used both sender side and
>>>>>>>>>>> listener side only . But since MSF4J is using only listener side if 
>>>>>>>>>>> we did
>>>>>>>>>>> not spawn separate thread from engine level for writing response  
>>>>>>>>>>> it will
>>>>>>>>>>> not work because request reading and writing happens through same 
>>>>>>>>>>> thread.
>>>>>>>>>>> But with the next release we will fix that. currently we are 
>>>>>>>>>>> finalizing on
>>>>>>>>>>> removing engine level thread model from transport.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> thanks
>>>>>>>>>>> IsuruR
>>>>>>>>>>>
>>>>>>>>>>> On Mon, May 2, 2016 at 3:50 PM, Afkham Azeez <az...@wso2.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Is this working after moving to the new transport framework?
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> *Afkham Azeez*
>>>>>>>>>>>> Director of Architecture; WSO2, Inc.; http://wso2.com
>>>>>>>>>>>> Member; Apache Software Foundation; http://www.apache.org/
>>>>>>>>>>>> * <http://www.apache.org/>*
>>>>>>>>>>>> *email: **az...@wso2.com* <az...@wso2.com>
>>>>>>>>>>>> * cell: +94 77 3320919 <%2B94%2077%203320919>blog: *
>>>>>>>>>>>> *http://blog.afkham.org* <http://blog.afkham.org>
>>>>>>>>>>>> *twitter: **http://twitter.com/afkham_azeez*
>>>>>>>>>>>> <http://twitter.com/afkham_azeez>
>>>>>>>>>>>> *linked-in: **http://lk.linkedin.com/in/afkhamazeez
>>>>>>>>>>>> <http://lk.linkedin.com/in/afkhamazeez>*
>>>>>>>>>>>>
>>>>>>>>>>>> *Lean . Enterprise . Middleware*
>>>>>>>>>>>>
>>>>>>>>>>>> _______________________________________________
>>>>>>>>>>>> Dev mailing list
>>>>>>>>>>>> Dev@wso2.org
>>>>>>>>>>>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best Regards
>>>>>>>>>>> Isuru Ranawaka
>>>>>>>>>>> M: +94714629880
>>>>>>>>>>> Blog : http://isurur.blogspot.com/
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> *Afkham Azeez*
>>>>>>>>>> Director of Architecture; WSO2, Inc.; http://wso2.com
>>>>>>>>>> Member; Apache Software Foundation; http://www.apache.org/
>>>>>>>>>> * <http://www.apache.org/>*
>>>>>>>>>> *email: **az...@wso2.com* <az...@wso2.com>
>>>>>>>>>> * cell: +94 77 3320919 <%2B94%2077%203320919>blog: *
>>>>>>>>>> *http://blog.afkham.org* <http://blog.afkham.org>
>>>>>>>>>> *twitter: **http://twitter.com/afkham_azeez*
>>>>>>>>>> <http://twitter.com/afkham_azeez>
>>>>>>>>>> *linked-in: **http://lk.linkedin.com/in/afkhamazeez
>>>>>>>>>> <http://lk.linkedin.com/in/afkhamazeez>*
>>>>>>>>>>
>>>>>>>>>> *Lean . Enterprise . Middleware*
>>>>>>>>>>
>>>>>>>>>> _______________________________________________
>>>>>>>>>> Dev mailing list
>>>>>>>>>> Dev@wso2.org
>>>>>>>>>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Kasun Indrasiri
>>>>>>>>> Software Architect
>>>>>>>>> WSO2, Inc.; http://wso2.com
>>>>>>>>> lean.enterprise.middleware
>>>>>>>>>
>>>>>>>>> cell: +94 77 556 5206
>>>>>>>>> Blog : http://kasunpanorama.blogspot.com/
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best Regards
>>>>>>>> Isuru Ranawaka
>>>>>>>> M: +94714629880
>>>>>>>> Blog : http://isurur.blogspot.com/
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> *Afkham Azeez*
>>>>>>> Director of Architecture; WSO2, Inc.; http://wso2.com
>>>>>>> Member; Apache Software Foundation; http://www.apache.org/
>>>>>>> * <http://www.apache.org/>*
>>>>>>> *email: **az...@wso2.com* <az...@wso2.com>
>>>>>>> * cell: +94 77 3320919 <%2B94%2077%203320919>blog: *
>>>>>>> *http://blog.afkham.org* <http://blog.afkham.org>
>>>>>>> *twitter: **http://twitter.com/afkham_azeez*
>>>>>>> <http://twitter.com/afkham_azeez>
>>>>>>> *linked-in: **http://lk.linkedin.com/in/afkhamazeez
>>>>>>> <http://lk.linkedin.com/in/afkhamazeez>*
>>>>>>>
>>>>>>> *Lean . Enterprise . Middleware*
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> *Afkham Azeez*
>>>>>> Director of Architecture; WSO2, Inc.; http://wso2.com
>>>>>> Member; Apache Software Foundation; http://www.apache.org/
>>>>>> * <http://www.apache.org/>*
>>>>>> *email: **az...@wso2.com* <az...@wso2.com>
>>>>>> * cell: +94 77 3320919 <%2B94%2077%203320919>blog: *
>>>>>> *http://blog.afkham.org* <http://blog.afkham.org>
>>>>>> *twitter: **http://twitter.com/afkham_azeez*
>>>>>> <http://twitter.com/afkham_azeez>
>>>>>> *linked-in: **http://lk.linkedin.com/in/afkhamazeez
>>>>>> <http://lk.linkedin.com/in/afkhamazeez>*
>>>>>>
>>>>>> *Lean . Enterprise . Middleware*
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> *Afkham Azeez*
>>>>> Director of Architecture; WSO2, Inc.; http://wso2.com
>>>>> Member; Apache Software Foundation; http://www.apache.org/
>>>>> * <http://www.apache.org/>*
>>>>> *email: **az...@wso2.com* <az...@wso2.com>
>>>>> * cell: +94 77 3320919 <%2B94%2077%203320919>blog: *
>>>>> *http://blog.afkham.org* <http://blog.afkham.org>
>>>>> *twitter: **http://twitter.com/afkham_azeez*
>>>>> <http://twitter.com/afkham_azeez>
>>>>> *linked-in: **http://lk.linkedin.com/in/afkhamazeez
>>>>> <http://lk.linkedin.com/in/afkhamazeez>*
>>>>>
>>>>> *Lean . Enterprise . Middleware*
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards
>>>> Isuru Ranawaka
>>>> M: +94714629880
>>>> Blog : http://isurur.blogspot.com/
>>>>
>>>
>>>
>>>
>>> --
>>> *Afkham Azeez*
>>> Director of Architecture; WSO2, Inc.; http://wso2.com
>>> Member; Apache Software Foundation; http://www.apache.org/
>>> * <http://www.apache.org/>*
>>> *email: **az...@wso2.com* <az...@wso2.com>
>>> * cell: +94 77 3320919 <%2B94%2077%203320919>blog: *
>>> *http://blog.afkham.org* <http://blog.afkham.org>
>>> *twitter: **http://twitter.com/afkham_azeez*
>>> <http://twitter.com/afkham_azeez>
>>> *linked-in: **http://lk.linkedin.com/in/afkhamazeez
>>> <http://lk.linkedin.com/in/afkhamazeez>*
>>>
>>> *Lean . Enterprise . Middleware*
>>>
>>
>>
>>
>> --
>> Samiyuru Senarathne
>> *Software Engineer*
>> Mobile : +94 (0) 71 134 6087
>> samiy...@wso2.com
>>
>
>
>
> --
> *Afkham Azeez*
> Director of Architecture; WSO2, Inc.; http://wso2.com
> Member; Apache Software Foundation; http://www.apache.org/
> * <http://www.apache.org/>*
> *email: **az...@wso2.com* <az...@wso2.com>
> * cell: +94 77 3320919 <%2B94%2077%203320919>blog: *
> *http://blog.afkham.org* <http://blog.afkham.org>
> *twitter: **http://twitter.com/afkham_azeez*
> <http://twitter.com/afkham_azeez>
> *linked-in: **http://lk.linkedin.com/in/afkhamazeez
> <http://lk.linkedin.com/in/afkhamazeez>*
>
> *Lean . Enterprise . Middleware*
>



-- 
Best Regards
Isuru Ranawaka
M: +94714629880
Blog : http://isurur.blogspot.com/
_______________________________________________
Dev mailing list
Dev@wso2.org
http://wso2.org/cgi-bin/mailman/listinfo/dev

Reply via email to