Yes, but source is not important for the resource allocation aspect because it is a reader that does not hold a lot of resources. The big ticket items are join and topN, and they need to be allocated at the same time if you don't have a swap space.
On Mon, Apr 10, 2017 at 8:56 AM, Vlad Rozov <[email protected]> wrote: > For the second pipeline, source can be de-allocated as soon as join gets > all data and join can be de-allocated as soon as topN gets all data. Note > that topN (and sink) does not need to be allocated before join starts > emitting data. > > Thank you, > > Vlad > > > On 4/10/17 08:48, Thomas Weise wrote: > >> The pipeline depends on the resource availability. It could be: >> >> ( source -> join -> writer ) - - -> ( reader -> topN -> sink) >> >> or >> >> (source -> join -> topN -> sink) >> >> The second case does not allow you do deallocate join (join and topN are >> active at the same time). >> >> >> On Mon, Apr 10, 2017 at 8:37 AM, Vlad Rozov <[email protected]> >> wrote: >> >> It is important. The generic pipeline proposed is (... -> writer) ---> >>> (reader -> join -> writer) ---> (reader -> ...), where reader-> >>> aggregator >>> -> writer becomes a common pattern for a single stage processing. >>> >>> Thank you, >>> >>> Vlad >>> >>> >>> On 4/10/17 08:31, Thomas Weise wrote: >>> >>> Where the data comes from isn't important for this discussion. The >>>> scenario >>>> is join -> topN >>>> >>>> With intermediate files it is: ( join -> writer ) - - -> ( reader -> >>>> topN >>>> ) >>>> >>>> >>>> On Mon, Apr 10, 2017 at 8:26 AM, Vlad Rozov <[email protected]> >>>> wrote: >>>> >>>> In your example join is both consumer and producer, is not it? Where >>>> does >>>> >>>>> it get data from? Join is not an input operator. >>>>> >>>>> Thank you, >>>>> >>>>> Vlad >>>>> >>>>> >>>>> On 4/10/17 08:13, Thomas Weise wrote: >>>>> >>>>> In this example join/writer produces the data, reader/topN consumes. >>>>> You >>>>> >>>>>> cannot deallocate producer before all data has been drained. When >>>>>> using >>>>>> files, join/writer can be deallocated when all data was flushed to the >>>>>> files and allocation of consumer can wait until that occurred, if the >>>>>> space >>>>>> isn't available to have both of them active at same time. >>>>>> >>>>>> Overall it seems this is not a matter of activating/deactivating >>>>>> streams >>>>>> but operators. >>>>>> >>>>>> Thomas >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Apr 10, 2017 at 8:05 AM, Vlad Rozov <[email protected]> >>>>>> wrote: >>>>>> >>>>>> With additional file readers/writers the pipeline of a single stage >>>>>> >>>>>> becomes the 3 operator use case I described. With ability to >>>>>>> open/close >>>>>>> ports, platform can optimize it by re-allocating resources from >>>>>>> readers >>>>>>> to >>>>>>> writers. >>>>>>> >>>>>>> Thank you, >>>>>>> >>>>>>> Vlad >>>>>>> >>>>>>> >>>>>>> On 4/10/17 07:44, Thomas Weise wrote: >>>>>>> >>>>>>> In streaming there is a stream (surprise), in a space constraint >>>>>>> batch >>>>>>> >>>>>>> case, we can have additional file writers/readers between the >>>>>>>> operators. >>>>>>>> >>>>>>>> Modules can in fact be used to support pipeline reuse, but they must >>>>>>>> be >>>>>>>> added/removed dynamically to support stages with on-demand resource >>>>>>>> allocation. >>>>>>>> >>>>>>>> Thomas >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Apr 10, 2017 at 7:37 AM, Vlad Rozov < >>>>>>>> [email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>> Do you suggest that in a streaming use case join operator also pass >>>>>>>> data >>>>>>>> >>>>>>>> to downstream using files or that there are two different join >>>>>>>> >>>>>>>>> operators >>>>>>>>> one for streaming and one for batch? If not, it means that the join >>>>>>>>> operator needs to emit data to a separate file output operator, so >>>>>>>>> it >>>>>>>>> still >>>>>>>>> needs to read data from a temporary space before emitting, why not >>>>>>>>> to >>>>>>>>> emit >>>>>>>>> directly to topN in this case? >>>>>>>>> >>>>>>>>> Is not pipeline reuse already supported by Apex modules? >>>>>>>>> >>>>>>>>> Thank you, >>>>>>>>> >>>>>>>>> Vlad >>>>>>>>> >>>>>>>>> >>>>>>>>> On 4/10/17 06:59, Thomas Weise wrote: >>>>>>>>> >>>>>>>>> I don't think this fully covers the the scenario of limited >>>>>>>>> resources. >>>>>>>>> >>>>>>>>> You >>>>>>>>> >>>>>>>>>> describe a case of 3 operators, but when you consider just 2 >>>>>>>>>> operators >>>>>>>>>> that >>>>>>>>>> both have to hold a large data set in memory, then the suggested >>>>>>>>>> approach >>>>>>>>>> won't work. Let's say the first operator is outer join and the >>>>>>>>>> second >>>>>>>>>> operator topN. Both are blocking and cannot emit before all input >>>>>>>>>> is >>>>>>>>>> seen. >>>>>>>>>> >>>>>>>>>> To deallocate the outer join, all results need to be drained. >>>>>>>>>> It's a >>>>>>>>>> resource swap and you need a temporary space to hold the data. >>>>>>>>>> Also, >>>>>>>>>> if >>>>>>>>>> the >>>>>>>>>> requirement is to be able to recover and retry from results of >>>>>>>>>> stage >>>>>>>>>> one, >>>>>>>>>> then you need a fault tolerant swap space. If the cluster does not >>>>>>>>>> have >>>>>>>>>> enough memory, then disk is a good option (SLA vs. memory >>>>>>>>>> tradeoff). >>>>>>>>>> >>>>>>>>>> I would also suggest to think beyond the single DAG scenario. >>>>>>>>>> Often >>>>>>>>>> users >>>>>>>>>> need to define pipelines that are composed of multiple smaller >>>>>>>>>> flows >>>>>>>>>> (which >>>>>>>>>> they may also want to reuse in multiple pipelines). APEXCORE-408 >>>>>>>>>> gives >>>>>>>>>> you >>>>>>>>>> an option to compose such flows within a single Apex application, >>>>>>>>>> in >>>>>>>>>> addition of covering the simplified use case that we discuss >>>>>>>>>> there. >>>>>>>>>> >>>>>>>>>> Thomas >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Apr 6, 2017 at 5:52 PM, Vlad Rozov < >>>>>>>>>> [email protected] >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> It is exactly the same use case with the exception that it is not >>>>>>>>>> >>>>>>>>>> necessary to write data to files. Consider 3 operators, an input >>>>>>>>>> >>>>>>>>>> operator, >>>>>>>>>>> an aggregate operator and an output operator. When the >>>>>>>>>>> application >>>>>>>>>>> starts, >>>>>>>>>>> the output port of the aggregate operator should be in the closed >>>>>>>>>>> state, >>>>>>>>>>> the stream between the second and the third would be inactive and >>>>>>>>>>> the >>>>>>>>>>> output operator does not need to be allocated. After the input >>>>>>>>>>> operator >>>>>>>>>>> process all data, it can close the output port and the input >>>>>>>>>>> operator >>>>>>>>>>> may >>>>>>>>>>> be de-allocated. Once the aggregator receives EOS on it's input >>>>>>>>>>> port, >>>>>>>>>>> it >>>>>>>>>>> should open the output port and start writing to it. At this >>>>>>>>>>> point, >>>>>>>>>>> the >>>>>>>>>>> output operator needs to be deployed and the stream between the >>>>>>>>>>> last >>>>>>>>>>> two >>>>>>>>>>> operators (aggregator and output) becomes active. >>>>>>>>>>> >>>>>>>>>>> In a real batch use case, it is preferable to have full >>>>>>>>>>> application >>>>>>>>>>> DAG >>>>>>>>>>> to >>>>>>>>>>> be statically defined and delegate to platform >>>>>>>>>>> activation/de-activation >>>>>>>>>>> of >>>>>>>>>>> stages. It is also preferable not to write intermediate files to >>>>>>>>>>> disk/HDFS, >>>>>>>>>>> but instead pass data in-memory. >>>>>>>>>>> >>>>>>>>>>> Thank you, >>>>>>>>>>> >>>>>>>>>>> Vlad >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 4/6/17 09:37, Thomas Weise wrote: >>>>>>>>>>> >>>>>>>>>>> You would need to provide more specifics of the use case you are >>>>>>>>>>> thinking >>>>>>>>>>> >>>>>>>>>>> to address to make this a meaningful discussion. >>>>>>>>>>> >>>>>>>>>>> An example for APEXCORE-408 (based on real batch use case): I >>>>>>>>>>>> have >>>>>>>>>>>> two >>>>>>>>>>>> stages, first stage produces a set of files that second stage >>>>>>>>>>>> needs >>>>>>>>>>>> as >>>>>>>>>>>> input. Stage 1 operators to be released and stage 2 operators >>>>>>>>>>>> deployed >>>>>>>>>>>> when >>>>>>>>>>>> stage 2 starts. These can be independent operators, they don't >>>>>>>>>>>> need >>>>>>>>>>>> to >>>>>>>>>>>> be >>>>>>>>>>>> connected through a stream. >>>>>>>>>>>> >>>>>>>>>>>> Thomas >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Apr 6, 2017 at 9:21 AM, Vlad Rozov < >>>>>>>>>>>> [email protected] >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> It is not about a use case difference. My proposal and >>>>>>>>>>>> APEXCORE-408 >>>>>>>>>>>> >>>>>>>>>>>> address the same use case - how to re-allocate resources for >>>>>>>>>>>> batch >>>>>>>>>>>> >>>>>>>>>>>> applications or applications where processing happens in stages. >>>>>>>>>>>> >>>>>>>>>>>>> The >>>>>>>>>>>>> difference between APEXCORE-408 and the proposal is shift in >>>>>>>>>>>>> complexity >>>>>>>>>>>>> from application logic to the platform. IMO, supporting batch >>>>>>>>>>>>> applications >>>>>>>>>>>>> using APEXCORE-408 will require more coding on the application >>>>>>>>>>>>> side. >>>>>>>>>>>>> >>>>>>>>>>>>> Thank you, >>>>>>>>>>>>> >>>>>>>>>>>>> Vlad >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On 4/5/17 21:57, Thomas Weise wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> I think this needs more input on a use case level. The ability >>>>>>>>>>>>> to >>>>>>>>>>>>> >>>>>>>>>>>>> dynamically alter the DAG internally will also address the >>>>>>>>>>>>> resource >>>>>>>>>>>>> >>>>>>>>>>>>> allocation for operators: >>>>>>>>>>>>> >>>>>>>>>>>>>> https://issues.apache.org/jira/browse/APEXCORE-408 >>>>>>>>>>>>>> >>>>>>>>>>>>>> It can be used to implement stages of a batch pipeline and is >>>>>>>>>>>>>> very >>>>>>>>>>>>>> flexible >>>>>>>>>>>>>> in general. Considering the likely implementation complexity >>>>>>>>>>>>>> for >>>>>>>>>>>>>> the >>>>>>>>>>>>>> proposed feature I would like to understand what benefits it >>>>>>>>>>>>>> provides >>>>>>>>>>>>>> to >>>>>>>>>>>>>> the user (use cases that cannot be addressed otherwise)? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sat, Apr 1, 2017 at 12:23 PM, Vlad Rozov < >>>>>>>>>>>>>> [email protected]> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Correct, a statefull downstream operator can only be >>>>>>>>>>>>>> undeployed >>>>>>>>>>>>>> at a >>>>>>>>>>>>>> >>>>>>>>>>>>>> checkpoint window after it consumes all data emitted by >>>>>>>>>>>>>> upstream >>>>>>>>>>>>>> >>>>>>>>>>>>>> operator >>>>>>>>>>>>>> >>>>>>>>>>>>>> on the closed port. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> It will be necessary to distinguish between closed port and >>>>>>>>>>>>>>> inactive >>>>>>>>>>>>>>> stream. After port is closed, stream may still be active and >>>>>>>>>>>>>>> after >>>>>>>>>>>>>>> port >>>>>>>>>>>>>>> is >>>>>>>>>>>>>>> open, stream may still be inactive (not yet ready). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The more contributors participate in the discussion and >>>>>>>>>>>>>>> implementation, >>>>>>>>>>>>>>> the more solid the feature will be. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thank you, >>>>>>>>>>>>>>> Vlad >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Отправлено с iPhone >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Apr 1, 2017, at 11:03, Pramod Immaneni < >>>>>>>>>>>>>>> [email protected]> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Generally a good idea. Care should be taken around fault >>>>>>>>>>>>>>> tolerance >>>>>>>>>>>>>>> and >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> idempotency. Close stream would need to stop accepting new >>>>>>>>>>>>>>> data >>>>>>>>>>>>>>> but >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> still >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> can't actually close all the streams and un-deploy operators >>>>>>>>>>>>>>>> till >>>>>>>>>>>>>>>> committed. Idempotency might require the close stream to >>>>>>>>>>>>>>>> take >>>>>>>>>>>>>>>> effect >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> end of the window. What would it then mean for re-opening >>>>>>>>>>>>>>>> streams >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> within >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> a >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> window? Also, looks like a larger undertaking, as Ram >>>>>>>>>>>>>>>> suggested >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> be >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> good to understand the use cases and I also suggest that >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> multiple >>>>>>>>>>>>>>>> folks >>>>>>>>>>>>>>>> participate in the implementation effort to ensure that we >>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>> able >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>> address all the scenarios and minimize chances of regression >>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>> existing >>>>>>>>>>>>>>>> behavior. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sat, Apr 1, 2017 at 8:12 AM, Vlad Rozov < >>>>>>>>>>>>>>>> [email protected] >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> All, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Currently Apex assumes that an operator can emit on any >>>>>>>>>>>>>>>> defined >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> output >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> port and all streams defined by a DAG are active. I'd like >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> propose >>>>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>> ability for an operator to open and close output ports. By >>>>>>>>>>>>>>>>> default >>>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>> ports defined by an operator will be open. In the case an >>>>>>>>>>>>>>>>> operator >>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> any >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> reason decides that it will not emit tuples on the output >>>>>>>>>>>>>>>>> port, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> may >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> close it. This will make the stream inactive and the >>>>>>>>>>>>>>>> application >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> master >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> may >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> undeploy the downstream (for that input stream) operators. >>>>>>>>>>>>>>>>> If >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> leads to >>>>>>>>>>>>>>>> containers that don't have any active operators, those >>>>>>>>>>>>>>>> containers >>>>>>>>>>>>>>>> may >>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> undeployed as well leading to better cluster resource >>>>>>>>>>>>>>>> utilization >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> better Apex elasticity. Later, the operator may be in a >>>>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>>> where >>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>> needs to emit tuples on the closed port. In this case, it >>>>>>>>>>>>>>>>> needs >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> re-open >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the port and wait till the stream becomes active again >>>>>>>>>>>>>>>>> before >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> emitting >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> tuples on that port. Making inactive stream active again, >>>>>>>>>>>>>>>> requires >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> application master to re-allocate containers and re-deploy >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> downstream >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> operators. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> It should be also possible for an application designer to >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> mark >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> streams >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> inactive when an application starts. This will allow the >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> application >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> master >>>>>>>>>>>>>>>> avoid reserving all containers when the application starts. >>>>>>>>>>>>>>>> Later, >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> port >>>>>>>>>>>>>>>> can be open and inactive stream become active. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thank you, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Vlad >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >
