Thanks Ram. This is helpful.

On Fri, Dec 23, 2016 at 9:30 PM, Munagala Ramanath <r...@datatorrent.com>
wrote:

> There are multiple approaches, each with its own tradeoffs. Here is first
> step:
>
> A1. Create a pair of in-memory non-transient queues to hold the tuples
> (non-transient
>        because we want them to be checkpointed and restored on recovery
> from failure).
> A2. Create a separate thread that waits for a trigger and then writes the
> entire
>       queue to the external system as a single commit -- that way, if it
> fails midway,
>       the transaction will be rolled back and the thread can repeat the
> attempt to
>      commit when it comes back. Since this process does not unduly delay
> the
>      main operator thread, your operator will not be killed by the App
> Master. When
>      it completes, it goes quiescent again waiting for the next trigger.
> A3. In the main operator thread, write records to the queue as they come
> in; when
>      you detect the end of a commitable batch, trigger the flag and switch
> to writing
>      new tuples to the second queue. When the second queue is ready to go,
>      you can swap the two queues and trigger the worker thread again.
>
> Clearly, there are are some tricky points to keep in mind:
> 1. Some careful synchronization of the two threads is needed to make this
> work.
> 2. If the inbound data rate is larger than the rate at which data can be
> committed,
>     you'll need to partition the output operator into as many replicas as
> needed
>     to ensure that the worker thread completes before the second queue is
> ready
>     in each partition.
> 3. Checkpointing can take a significant amount of time as the queues get
> larger.
>
> Treat this as an exploratory first attempt where you get a feel for
> operator speeds,
> data rates, external system speeds, etc.
>
> You can then explore other options (if necessary) to replace the queues
> with some other storage
> mechanism such as *ManagedState*; there are examples of operators using
> *ManagedState*
> under subdirectories of https://github.com/apache/apex-malhar/tree/master/
> library/src/main/java/org/apache/apex/malhar/lib/
>
>
> Ram
>
> On Fri, Dec 23, 2016 at 4:56 PM, Apex User <knsc...@gmail.com> wrote:
>
>> Dear Team,
>>
>> I am performing a batch operation. As per the use case, I cannot commit
>> the transaction from output operator to external system unless I process
>> all the batch data. Once all the data is processed, output operator will
>> get a bulking collection Object as single tuple. Output Operator is now
>> supposed to iterate over the collection and commit records to external
>> system one by one.
>>
>> If the collection size is about 1K, then everything works fine, however,
>> if the size increases it causes the output operator to die. I am expecting
>> about 100K records in that collection which is served as a single tuple to
>> the output operator. I understand writing so many records in a single
>> execution of process method is causing issue. Can someone please suggest me
>> some approach to resolve this issue?
>>
>> public final transient DefaultInputPort<T> input = new
>> DefaultInputPort<T>()  {
>>
>>     public void process(List<String> tuple) {   // write each object to
>> external system one by one   }
>>
>> };
>>
>>
>

Reply via email to