Thanks Ram. This is helpful.
On Fri, Dec 23, 2016 at 9:30 PM, Munagala Ramanath <r...@datatorrent.com> wrote: > There are multiple approaches, each with its own tradeoffs. Here is first > step: > > A1. Create a pair of in-memory non-transient queues to hold the tuples > (non-transient > because we want them to be checkpointed and restored on recovery > from failure). > A2. Create a separate thread that waits for a trigger and then writes the > entire > queue to the external system as a single commit -- that way, if it > fails midway, > the transaction will be rolled back and the thread can repeat the > attempt to > commit when it comes back. Since this process does not unduly delay > the > main operator thread, your operator will not be killed by the App > Master. When > it completes, it goes quiescent again waiting for the next trigger. > A3. In the main operator thread, write records to the queue as they come > in; when > you detect the end of a commitable batch, trigger the flag and switch > to writing > new tuples to the second queue. When the second queue is ready to go, > you can swap the two queues and trigger the worker thread again. > > Clearly, there are are some tricky points to keep in mind: > 1. Some careful synchronization of the two threads is needed to make this > work. > 2. If the inbound data rate is larger than the rate at which data can be > committed, > you'll need to partition the output operator into as many replicas as > needed > to ensure that the worker thread completes before the second queue is > ready > in each partition. > 3. Checkpointing can take a significant amount of time as the queues get > larger. > > Treat this as an exploratory first attempt where you get a feel for > operator speeds, > data rates, external system speeds, etc. > > You can then explore other options (if necessary) to replace the queues > with some other storage > mechanism such as *ManagedState*; there are examples of operators using > *ManagedState* > under subdirectories of https://github.com/apache/apex-malhar/tree/master/ > library/src/main/java/org/apache/apex/malhar/lib/ > > > Ram > > On Fri, Dec 23, 2016 at 4:56 PM, Apex User <knsc...@gmail.com> wrote: > >> Dear Team, >> >> I am performing a batch operation. As per the use case, I cannot commit >> the transaction from output operator to external system unless I process >> all the batch data. Once all the data is processed, output operator will >> get a bulking collection Object as single tuple. Output Operator is now >> supposed to iterate over the collection and commit records to external >> system one by one. >> >> If the collection size is about 1K, then everything works fine, however, >> if the size increases it causes the output operator to die. I am expecting >> about 100K records in that collection which is served as a single tuple to >> the output operator. I understand writing so many records in a single >> execution of process method is causing issue. Can someone please suggest me >> some approach to resolve this issue? >> >> public final transient DefaultInputPort<T> input = new >> DefaultInputPort<T>() { >> >> public void process(List<String> tuple) { // write each object to >> external system one by one } >> >> }; >> >> >