There are multiple approaches, each with its own tradeoffs. Here is first
step:

A1. Create a pair of in-memory non-transient queues to hold the tuples
(non-transient
       because we want them to be checkpointed and restored on recovery
from failure).
A2. Create a separate thread that waits for a trigger and then writes the
entire
      queue to the external system as a single commit -- that way, if it
fails midway,
      the transaction will be rolled back and the thread can repeat the
attempt to
     commit when it comes back. Since this process does not unduly delay the
     main operator thread, your operator will not be killed by the App
Master. When
     it completes, it goes quiescent again waiting for the next trigger.
A3. In the main operator thread, write records to the queue as they come
in; when
     you detect the end of a commitable batch, trigger the flag and switch
to writing
     new tuples to the second queue. When the second queue is ready to go,
     you can swap the two queues and trigger the worker thread again.

Clearly, there are are some tricky points to keep in mind:
1. Some careful synchronization of the two threads is needed to make this
work.
2. If the inbound data rate is larger than the rate at which data can be
committed,
    you'll need to partition the output operator into as many replicas as
needed
    to ensure that the worker thread completes before the second queue is
ready
    in each partition.
3. Checkpointing can take a significant amount of time as the queues get
larger.

Treat this as an exploratory first attempt where you get a feel for
operator speeds,
data rates, external system speeds, etc.

You can then explore other options (if necessary) to replace the queues
with some other storage
mechanism such as *ManagedState*; there are examples of operators using
*ManagedState*
under subdirectories of
https://github.com/apache/apex-malhar/tree/master/library/src/main/java/org/apache/apex/malhar/lib/


Ram

On Fri, Dec 23, 2016 at 4:56 PM, Apex User <[email protected]> wrote:

> Dear Team,
>
> I am performing a batch operation. As per the use case, I cannot commit
> the transaction from output operator to external system unless I process
> all the batch data. Once all the data is processed, output operator will
> get a bulking collection Object as single tuple. Output Operator is now
> supposed to iterate over the collection and commit records to external
> system one by one.
>
> If the collection size is about 1K, then everything works fine, however,
> if the size increases it causes the output operator to die. I am expecting
> about 100K records in that collection which is served as a single tuple to
> the output operator. I understand writing so many records in a single
> execution of process method is causing issue. Can someone please suggest me
> some approach to resolve this issue?
>
> public final transient DefaultInputPort<T> input = new
> DefaultInputPort<T>()  {
>
>     public void process(List<String> tuple) {   // write each object to
> external system one by one   }
>
> };
>
>

Reply via email to