Hi Azeez, Please find the below sample code.
public class Test { public static void main(String[] args) { Thread t = new Thread(new Test.ContentFiller()); t.start(); } static class ContentFiller implements Runnable { @Override public void run() { CarbonMessage carbonMessage = new DefaultCarbonMessage(); String val = "This is a test"; byte[] array = val.getBytes(); Thread t = new Thread(new ContentReader(carbonMessage)); t.start(); for (int i = 0; i < 1000; i++) { ByteBuffer byteBuffer = ByteBuffer.allocate(array.length); byteBuffer.put(array); System.out.println("Adding content"); carbonMessage.addMessageBody(byteBuffer); } carbonMessage.setEndOfMsgAdded(true); } } static class ContentReader implements Runnable { CarbonMessage carbonMessage; ContentReader(CarbonMessage carbonMessage) { this.carbonMessage = carbonMessage; } @Override public void run() { while (true) { if (carbonMessage.isEmpty() && carbonMessage.isEndOfMsgAdded()) { break; } else { ByteBuffer byteBuffer = carbonMessage.getMessageBody(); System.out.println("Content Reading " + new String(byteBuffer.array())); } } } } } On Thu, Jun 2, 2016 at 9:55 PM, Afkham Azeez <az...@wso2.com> wrote: > This CarbonMessage API is not intuitive. Please share code samples that > demonstrate streaming input handling with CarbonMessage (request). > > On Thu, Jun 2, 2016 at 9:52 PM, Samiyuru Senarathne <samiy...@wso2.com> > wrote: > >> I assume isEndOfMsgAdded only tell that all the chunks are loaded to the >> blocking queue. So still we have to complete the total blocking queue. >> >> On Thu, Jun 2, 2016 at 9:50 PM, Afkham Azeez <az...@wso2.com> wrote: >> >>> while(!carbonMsg.isEndOfMsgAdded) { >>> ByteBuffer chunk = carbonMsg.getMessageBody(); >>> >>> } >>> >>> In the above segment, carbonMsg is the request CarbonMessage and the >>> above code segment doesn't work. Code inside the while loop never executes. >>> >>> >>> On Thu, Jun 2, 2016 at 9:47 PM, Isuru Ranawaka <isu...@wso2.com> wrote: >>> >>>> Hi Azeez, >>>> yes that should work. Anyhow content needs to be added to carbonMsg >>>> from one thread and read to be from another thread. >>>> >>>> thanks >>>> >>>> On Thu, Jun 2, 2016 at 9:23 PM, Afkham Azeez <az...@wso2.com> wrote: >>>> >>>>> Is the following code segment the way to read all the chunks; >>>>> >>>>> while(!carbonMsg.isEndOfMsgAdded) { >>>>> ByteBuffer chunk = carbonMsg.getMessageBody(); >>>>> >>>>> } >>>>> >>>>> On Thu, Jun 2, 2016 at 9:21 PM, Afkham Azeez <az...@wso2.com> wrote: >>>>> >>>>>> Looks like we require changes for input streaming as well. >>>>>> >>>>>> Does CarbonMessage.getMessageBody() block until the next chunk is >>>>>> received? >>>>>> >>>>>> On Thu, Jun 2, 2016 at 9:18 PM, Afkham Azeez <az...@wso2.com> wrote: >>>>>> >>>>>>> Samiyuru/Isuru, >>>>>>> Does this mean that the input streaming in MSF4J is currently >>>>>>> working without any issue and only output streaming requires some work? >>>>>>> >>>>>>> On Tue, May 3, 2016 at 2:29 PM, Isuru Ranawaka <isu...@wso2.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi All, >>>>>>>> >>>>>>>> Following are the details on streaming support of carbon transport. >>>>>>>> Basically we can looking to streaming support in request path and >>>>>>>> response >>>>>>>> path separately. >>>>>>>> >>>>>>>> Request Path (Streaming is working) >>>>>>>> >>>>>>>> [image: requestpath.png] >>>>>>>> >>>>>>>> According to above diagram >>>>>>>> >>>>>>>> - >>>>>>>> >>>>>>>> We have blocking queue in the carbon message which keeps the >>>>>>>> content . when headers are received through Netty worker thread it >>>>>>>> will >>>>>>>> create CarbonMessage with a blocking queue and publish carbon >>>>>>>> message to >>>>>>>> engine level thread. >>>>>>>> - >>>>>>>> >>>>>>>> Reference for blocking queue is cached in the connection and >>>>>>>> when content is received it will be filled to that queue from Netty >>>>>>>> worker >>>>>>>> thread. >>>>>>>> - >>>>>>>> >>>>>>>> Meanwhile engine level threads can consume content through >>>>>>>> queue(While IO worker is filling ) and can directly send to file >>>>>>>> system or >>>>>>>> use sender for send messages to external service. >>>>>>>> - >>>>>>>> >>>>>>>> According to that streaming should work in Request path in >>>>>>>> Integration Server or MSF4J without any problem. >>>>>>>> >>>>>>>> >>>>>>>> Response Path (Streaming working scenario) >>>>>>>> >>>>>>>> [image: responsepathworking.png] >>>>>>>> >>>>>>>> >>>>>>>> In Response path basic difference we have is we are handling >>>>>>>> responses through callbacks. >>>>>>>> >>>>>>>> >>>>>>>> - >>>>>>>> >>>>>>>> Similar to Request path architecture Sender side IO thread >>>>>>>> creates a CM when response headers are received and publish to >>>>>>>> engine >>>>>>>> level thread . >>>>>>>> - >>>>>>>> >>>>>>>> Engine level thread calls carbonCallback.done() and waits on >>>>>>>> queue for content. >>>>>>>> - >>>>>>>> >>>>>>>> Meanwhile IO thread writes the content to the Queue . >>>>>>>> - >>>>>>>> >>>>>>>> So writing to Queue and reading from queue happens parallel and >>>>>>>> streaming should work properly. >>>>>>>> - >>>>>>>> >>>>>>>> So streaming is working fine with Integration Server for this >>>>>>>> kind of scenarios. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Response Path (Streaming not working scenario) >>>>>>>> >>>>>>>> [image: responsepathnotworking.png] >>>>>>>> >>>>>>>> This scenario is basically, if we have a Echo mediator or assume >>>>>>>> MSF4J has a service which is reading a file as chunks and writes back >>>>>>>> to >>>>>>>> client . >>>>>>>> >>>>>>>> >>>>>>>> - >>>>>>>> >>>>>>>> Basic difference with the previous one is in this approach >>>>>>>> reading a file chunk or stream and writing that file chunk or >>>>>>>> stream to >>>>>>>> Queue is happened within the same engine level thread as well as >>>>>>>> with >>>>>>>> reading from queue and writing to Listener side Netty worker >>>>>>>> threads.(Same >>>>>>>> thread produce and consume causes for deadlock) >>>>>>>> - >>>>>>>> >>>>>>>> In the previous example reading from stream and filling the >>>>>>>> queue was happened through Sender side IO thread and consuming the >>>>>>>> queue >>>>>>>> and writing to Listener side IO thread was happened through engine >>>>>>>> level >>>>>>>> thread. (Two different threads produce and consume) >>>>>>>> - >>>>>>>> >>>>>>>> Streaming is not working in this kind of scenarios because we >>>>>>>> cannot write to queue and keep polling the queue within single >>>>>>>> thread. >>>>>>>> >>>>>>>> >>>>>>>> We can figure out following solutions and what will be the most >>>>>>>> suitable one. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> - >>>>>>>> >>>>>>>> Introduce another thread for run the callback logic instead of >>>>>>>> running through calling thread. >>>>>>>> >>>>>>>> This will solve the above streaming problem but >>>>>>>> when it comes to general message flow this will add another level of >>>>>>>> thread which will actually does not need. >>>>>>>> >>>>>>>> - >>>>>>>> >>>>>>>> Introduce another method in ResponseCallback which will support >>>>>>>> streaming which does not queuing contents. it will directly call IO >>>>>>>> threads and write contents to IO when chunks are received. This can >>>>>>>> be used >>>>>>>> only within single thread scenario(File reading and writing ). >>>>>>>> - >>>>>>>> >>>>>>>> Introduce another thread for read File stream or call Callback >>>>>>>> from another thread other than to reading thread from MSF4J >>>>>>>> level.Then it >>>>>>>> will be equivalent to how we used it in Integration Server with the >>>>>>>> use of >>>>>>>> Sender. >>>>>>>> >>>>>>>> ThankYou >>>>>>>> >>>>>>>> IsuruR >>>>>>>> >>>>>>>> On Tue, May 3, 2016 at 12:09 PM, Kasun Indrasiri <ka...@wso2.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Had a chat with Ranawaka on this.. seems like we do have couple of >>>>>>>>> ways to handle this. He will share the details. >>>>>>>>> >>>>>>>>> On Mon, May 2, 2016 at 6:13 AM, Afkham Azeez <az...@wso2.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> The problem is, we can't do the next MSF4J release because we >>>>>>>>>> can't lose a feature in a release. This is a blocker for us. We have >>>>>>>>>> plans >>>>>>>>>> to release in the next 2 weeks. >>>>>>>>>> >>>>>>>>>> On Mon, May 2, 2016 at 4:56 PM, Isuru Ranawaka <isu...@wso2.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Azeez, >>>>>>>>>>> >>>>>>>>>>> Currently streaming works if we used both sender side and >>>>>>>>>>> listener side only . But since MSF4J is using only listener side if >>>>>>>>>>> we did >>>>>>>>>>> not spawn separate thread from engine level for writing response >>>>>>>>>>> it will >>>>>>>>>>> not work because request reading and writing happens through same >>>>>>>>>>> thread. >>>>>>>>>>> But with the next release we will fix that. currently we are >>>>>>>>>>> finalizing on >>>>>>>>>>> removing engine level thread model from transport. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> thanks >>>>>>>>>>> IsuruR >>>>>>>>>>> >>>>>>>>>>> On Mon, May 2, 2016 at 3:50 PM, Afkham Azeez <az...@wso2.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Is this working after moving to the new transport framework? >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> *Afkham Azeez* >>>>>>>>>>>> Director of Architecture; WSO2, Inc.; http://wso2.com >>>>>>>>>>>> Member; Apache Software Foundation; http://www.apache.org/ >>>>>>>>>>>> * <http://www.apache.org/>* >>>>>>>>>>>> *email: **az...@wso2.com* <az...@wso2.com> >>>>>>>>>>>> * cell: +94 77 3320919 <%2B94%2077%203320919>blog: * >>>>>>>>>>>> *http://blog.afkham.org* <http://blog.afkham.org> >>>>>>>>>>>> *twitter: **http://twitter.com/afkham_azeez* >>>>>>>>>>>> <http://twitter.com/afkham_azeez> >>>>>>>>>>>> *linked-in: **http://lk.linkedin.com/in/afkhamazeez >>>>>>>>>>>> <http://lk.linkedin.com/in/afkhamazeez>* >>>>>>>>>>>> >>>>>>>>>>>> *Lean . Enterprise . Middleware* >>>>>>>>>>>> >>>>>>>>>>>> _______________________________________________ >>>>>>>>>>>> Dev mailing list >>>>>>>>>>>> Dev@wso2.org >>>>>>>>>>>> http://wso2.org/cgi-bin/mailman/listinfo/dev >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Best Regards >>>>>>>>>>> Isuru Ranawaka >>>>>>>>>>> M: +94714629880 >>>>>>>>>>> Blog : http://isurur.blogspot.com/ >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> *Afkham Azeez* >>>>>>>>>> Director of Architecture; WSO2, Inc.; http://wso2.com >>>>>>>>>> Member; Apache Software Foundation; http://www.apache.org/ >>>>>>>>>> * <http://www.apache.org/>* >>>>>>>>>> *email: **az...@wso2.com* <az...@wso2.com> >>>>>>>>>> * cell: +94 77 3320919 <%2B94%2077%203320919>blog: * >>>>>>>>>> *http://blog.afkham.org* <http://blog.afkham.org> >>>>>>>>>> *twitter: **http://twitter.com/afkham_azeez* >>>>>>>>>> <http://twitter.com/afkham_azeez> >>>>>>>>>> *linked-in: **http://lk.linkedin.com/in/afkhamazeez >>>>>>>>>> <http://lk.linkedin.com/in/afkhamazeez>* >>>>>>>>>> >>>>>>>>>> *Lean . Enterprise . Middleware* >>>>>>>>>> >>>>>>>>>> _______________________________________________ >>>>>>>>>> Dev mailing list >>>>>>>>>> Dev@wso2.org >>>>>>>>>> http://wso2.org/cgi-bin/mailman/listinfo/dev >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Kasun Indrasiri >>>>>>>>> Software Architect >>>>>>>>> WSO2, Inc.; http://wso2.com >>>>>>>>> lean.enterprise.middleware >>>>>>>>> >>>>>>>>> cell: +94 77 556 5206 >>>>>>>>> Blog : http://kasunpanorama.blogspot.com/ >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best Regards >>>>>>>> Isuru Ranawaka >>>>>>>> M: +94714629880 >>>>>>>> Blog : http://isurur.blogspot.com/ >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> *Afkham Azeez* >>>>>>> Director of Architecture; WSO2, Inc.; http://wso2.com >>>>>>> Member; Apache Software Foundation; http://www.apache.org/ >>>>>>> * <http://www.apache.org/>* >>>>>>> *email: **az...@wso2.com* <az...@wso2.com> >>>>>>> * cell: +94 77 3320919 <%2B94%2077%203320919>blog: * >>>>>>> *http://blog.afkham.org* <http://blog.afkham.org> >>>>>>> *twitter: **http://twitter.com/afkham_azeez* >>>>>>> <http://twitter.com/afkham_azeez> >>>>>>> *linked-in: **http://lk.linkedin.com/in/afkhamazeez >>>>>>> <http://lk.linkedin.com/in/afkhamazeez>* >>>>>>> >>>>>>> *Lean . Enterprise . Middleware* >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> *Afkham Azeez* >>>>>> Director of Architecture; WSO2, Inc.; http://wso2.com >>>>>> Member; Apache Software Foundation; http://www.apache.org/ >>>>>> * <http://www.apache.org/>* >>>>>> *email: **az...@wso2.com* <az...@wso2.com> >>>>>> * cell: +94 77 3320919 <%2B94%2077%203320919>blog: * >>>>>> *http://blog.afkham.org* <http://blog.afkham.org> >>>>>> *twitter: **http://twitter.com/afkham_azeez* >>>>>> <http://twitter.com/afkham_azeez> >>>>>> *linked-in: **http://lk.linkedin.com/in/afkhamazeez >>>>>> <http://lk.linkedin.com/in/afkhamazeez>* >>>>>> >>>>>> *Lean . Enterprise . Middleware* >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> *Afkham Azeez* >>>>> Director of Architecture; WSO2, Inc.; http://wso2.com >>>>> Member; Apache Software Foundation; http://www.apache.org/ >>>>> * <http://www.apache.org/>* >>>>> *email: **az...@wso2.com* <az...@wso2.com> >>>>> * cell: +94 77 3320919 <%2B94%2077%203320919>blog: * >>>>> *http://blog.afkham.org* <http://blog.afkham.org> >>>>> *twitter: **http://twitter.com/afkham_azeez* >>>>> <http://twitter.com/afkham_azeez> >>>>> *linked-in: **http://lk.linkedin.com/in/afkhamazeez >>>>> <http://lk.linkedin.com/in/afkhamazeez>* >>>>> >>>>> *Lean . Enterprise . Middleware* >>>>> >>>> >>>> >>>> >>>> -- >>>> Best Regards >>>> Isuru Ranawaka >>>> M: +94714629880 >>>> Blog : http://isurur.blogspot.com/ >>>> >>> >>> >>> >>> -- >>> *Afkham Azeez* >>> Director of Architecture; WSO2, Inc.; http://wso2.com >>> Member; Apache Software Foundation; http://www.apache.org/ >>> * <http://www.apache.org/>* >>> *email: **az...@wso2.com* <az...@wso2.com> >>> * cell: +94 77 3320919 <%2B94%2077%203320919>blog: * >>> *http://blog.afkham.org* <http://blog.afkham.org> >>> *twitter: **http://twitter.com/afkham_azeez* >>> <http://twitter.com/afkham_azeez> >>> *linked-in: **http://lk.linkedin.com/in/afkhamazeez >>> <http://lk.linkedin.com/in/afkhamazeez>* >>> >>> *Lean . Enterprise . Middleware* >>> >> >> >> >> -- >> Samiyuru Senarathne >> *Software Engineer* >> Mobile : +94 (0) 71 134 6087 >> samiy...@wso2.com >> > > > > -- > *Afkham Azeez* > Director of Architecture; WSO2, Inc.; http://wso2.com > Member; Apache Software Foundation; http://www.apache.org/ > * <http://www.apache.org/>* > *email: **az...@wso2.com* <az...@wso2.com> > * cell: +94 77 3320919 <%2B94%2077%203320919>blog: * > *http://blog.afkham.org* <http://blog.afkham.org> > *twitter: **http://twitter.com/afkham_azeez* > <http://twitter.com/afkham_azeez> > *linked-in: **http://lk.linkedin.com/in/afkhamazeez > <http://lk.linkedin.com/in/afkhamazeez>* > > *Lean . Enterprise . Middleware* > -- Best Regards Isuru Ranawaka M: +94714629880 Blog : http://isurur.blogspot.com/
_______________________________________________ Dev mailing list Dev@wso2.org http://wso2.org/cgi-bin/mailman/listinfo/dev