In streaming there is a stream (surprise), in a space constraint batch
case, we can have additional file writers/readers between the operators.

Modules can in fact be used to support pipeline reuse, but they must be
added/removed dynamically to support stages with on-demand resource
allocation.

Thomas


On Mon, Apr 10, 2017 at 7:37 AM, Vlad Rozov <[email protected]> wrote:

> Do you suggest that in a streaming use case join operator also pass data
> to downstream using files or that there are two different join operators
> one for streaming and one for batch? If not, it means that the join
> operator needs to emit data to a separate file output operator, so it still
> needs to read data from a temporary space before emitting, why not to emit
> directly to topN in this case?
>
> Is not pipeline reuse already supported by Apex modules?
>
> Thank you,
>
> Vlad
>
>
> On 4/10/17 06:59, Thomas Weise wrote:
>
>> I don't think this fully covers the the scenario of limited resources. You
>> describe a case of 3 operators, but when you consider just 2 operators
>> that
>> both have to hold a large data set in memory, then the suggested approach
>> won't work. Let's say the first operator is outer join and the second
>> operator topN. Both are blocking and cannot emit before all input is seen.
>>
>> To deallocate the outer join, all results need to be drained. It's a
>> resource swap and you need a temporary space to hold the data. Also, if
>> the
>> requirement is to be able to recover and retry from results of stage one,
>> then you need a fault tolerant swap space. If the cluster does not have
>> enough memory, then disk is a good option (SLA vs. memory tradeoff).
>>
>> I would also suggest to think beyond the single DAG scenario. Often users
>> need to define pipelines that are composed of multiple smaller flows
>> (which
>> they may also want to reuse in multiple pipelines). APEXCORE-408 gives you
>> an option to compose such flows within a single Apex application, in
>> addition of covering the simplified use case that we discuss there.
>>
>> Thomas
>>
>>
>> On Thu, Apr 6, 2017 at 5:52 PM, Vlad Rozov <[email protected]>
>> wrote:
>>
>> It is exactly the same use case with the exception that it is not
>>> necessary to write data to files. Consider 3 operators, an input
>>> operator,
>>> an aggregate operator and an output operator. When the application
>>> starts,
>>> the output port of the aggregate operator should be in the closed state,
>>> the stream between the second and the third would be inactive and the
>>> output operator does not need to be allocated. After the input operator
>>> process all data, it can close the output port and the input operator may
>>> be de-allocated. Once the aggregator receives EOS on it's input port, it
>>> should open the output port and start writing to it. At this point, the
>>> output operator needs to be deployed and the stream between the last two
>>> operators (aggregator and output) becomes active.
>>>
>>> In a real batch use case, it is preferable to have full application DAG
>>> to
>>> be statically defined and delegate to platform activation/de-activation
>>> of
>>> stages. It is also preferable not to write intermediate files to
>>> disk/HDFS,
>>> but instead pass data in-memory.
>>>
>>> Thank you,
>>>
>>> Vlad
>>>
>>>
>>> On 4/6/17 09:37, Thomas Weise wrote:
>>>
>>> You would need to provide more specifics of the use case you are thinking
>>>> to address to make this a meaningful discussion.
>>>>
>>>> An example for APEXCORE-408 (based on real batch use case): I have two
>>>> stages, first stage produces a set of files that second stage needs as
>>>> input. Stage 1 operators to be released and stage 2 operators deployed
>>>> when
>>>> stage 2 starts. These can be independent operators, they don't need to
>>>> be
>>>> connected through a stream.
>>>>
>>>> Thomas
>>>>
>>>>
>>>> On Thu, Apr 6, 2017 at 9:21 AM, Vlad Rozov <[email protected]>
>>>> wrote:
>>>>
>>>> It is not about a use case difference. My proposal and APEXCORE-408
>>>>
>>>>> address the same use case - how to re-allocate resources for batch
>>>>> applications or applications where processing happens in stages. The
>>>>> difference between APEXCORE-408 and the proposal is shift in complexity
>>>>> from application logic to the platform. IMO, supporting batch
>>>>> applications
>>>>> using APEXCORE-408 will require more coding on the application side.
>>>>>
>>>>> Thank you,
>>>>>
>>>>> Vlad
>>>>>
>>>>>
>>>>> On 4/5/17 21:57, Thomas Weise wrote:
>>>>>
>>>>> I think this needs more input on a use case level. The ability to
>>>>>
>>>>>> dynamically alter the DAG internally will also address the resource
>>>>>> allocation for operators:
>>>>>>
>>>>>> https://issues.apache.org/jira/browse/APEXCORE-408
>>>>>>
>>>>>> It can be used to implement stages of a batch pipeline and is very
>>>>>> flexible
>>>>>> in general. Considering the likely implementation complexity for the
>>>>>> proposed feature I would like to understand what benefits it provides
>>>>>> to
>>>>>> the user (use cases that cannot be addressed otherwise)?
>>>>>>
>>>>>> Thanks,
>>>>>> Thomas
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sat, Apr 1, 2017 at 12:23 PM, Vlad Rozov <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>> Correct, a statefull downstream operator can only be undeployed at a
>>>>>>
>>>>>> checkpoint window after it consumes all data emitted by upstream
>>>>>>> operator
>>>>>>> on the closed port.
>>>>>>>
>>>>>>> It will be necessary to distinguish between closed port and inactive
>>>>>>> stream. After port is closed, stream may still be active and after
>>>>>>> port
>>>>>>> is
>>>>>>> open, stream may still be inactive (not yet ready).
>>>>>>>
>>>>>>> The more contributors participate in the discussion and
>>>>>>> implementation,
>>>>>>> the more solid the feature will be.
>>>>>>>
>>>>>>> Thank you,
>>>>>>> Vlad
>>>>>>>
>>>>>>> Отправлено с iPhone
>>>>>>>
>>>>>>> On Apr 1, 2017, at 11:03, Pramod Immaneni <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Generally a good idea. Care should be taken around fault tolerance
>>>>>>> and
>>>>>>>
>>>>>>>> idempotency. Close stream would need to stop accepting new data but
>>>>>>>> still
>>>>>>>> can't actually close all the streams and un-deploy operators till
>>>>>>>> committed. Idempotency might require the close stream to take effect
>>>>>>>> at
>>>>>>>>
>>>>>>>> the
>>>>>>>>
>>>>>>> end of the window. What would it then mean for re-opening streams
>>>>>>>
>>>>>>>> within
>>>>>>>>
>>>>>>>> a
>>>>>>>>
>>>>>>> window? Also, looks like a larger undertaking, as Ram suggested would
>>>>>>>
>>>>>>>> be
>>>>>>>> good to understand the use cases and I also suggest that multiple
>>>>>>>> folks
>>>>>>>> participate in the implementation effort to ensure that we are able
>>>>>>>> to
>>>>>>>> address all the scenarios and minimize chances of regression in
>>>>>>>> existing
>>>>>>>> behavior.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>> On Sat, Apr 1, 2017 at 8:12 AM, Vlad Rozov <[email protected]
>>>>>>>> >
>>>>>>>> wrote:
>>>>>>>> All,
>>>>>>>>
>>>>>>>> Currently Apex assumes that an operator can emit on any defined
>>>>>>>>> output
>>>>>>>>> port and all streams defined by a DAG are active. I'd like to
>>>>>>>>> propose
>>>>>>>>> an
>>>>>>>>> ability for an operator to open and close output ports. By default
>>>>>>>>> all
>>>>>>>>> ports defined by an operator will be open. In the case an operator
>>>>>>>>> for
>>>>>>>>>
>>>>>>>>> any
>>>>>>>>>
>>>>>>>> reason decides that it will not emit tuples on the output port, it
>>>>>>>> may
>>>>>>>>
>>>>>>>> close it. This will make the stream inactive and the application
>>>>>>>>> master
>>>>>>>>>
>>>>>>>>> may
>>>>>>>>>
>>>>>>>> undeploy the downstream (for that input stream) operators. If this
>>>>>>>> leads to
>>>>>>>> containers that don't have any active operators, those containers
>>>>>>>> may
>>>>>>>> be
>>>>>>>>
>>>>>>>> undeployed as well leading to better cluster resource utilization
>>>>>>>>> and
>>>>>>>>> better Apex elasticity. Later, the operator may be in a state where
>>>>>>>>> it
>>>>>>>>> needs to emit tuples on the closed port. In this case, it needs to
>>>>>>>>>
>>>>>>>>> re-open
>>>>>>>>>
>>>>>>>> the port and wait till the stream becomes active again before
>>>>>>>> emitting
>>>>>>>>
>>>>>>>> tuples on that port. Making inactive stream active again, requires
>>>>>>>>> the
>>>>>>>>> application master to re-allocate containers and re-deploy the
>>>>>>>>>
>>>>>>>>> downstream
>>>>>>>>>
>>>>>>>> operators.
>>>>>>>>
>>>>>>>> It should be also possible for an application designer to mark
>>>>>>>>> streams
>>>>>>>>>
>>>>>>>>> as
>>>>>>>>>
>>>>>>>> inactive when an application starts. This will allow the application
>>>>>>>> master
>>>>>>>> avoid reserving all containers when the application starts. Later,
>>>>>>>> the
>>>>>>>> port
>>>>>>>> can be open and inactive stream become active.
>>>>>>>>
>>>>>>>> Thank you,
>>>>>>>>>
>>>>>>>>> Vlad
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>

Reply via email to