Becket, is FLIP-27 still on track to be released in 1.10? On Tue, Jan 7, 2020 at 7:04 PM Becket Qin <becket....@gmail.com> wrote:
> Hi folks, > > Happy new year! > > Stephan and I chatted offline yesterday. After reading the email thread > again, I found that I have misunderstood Dawid's original proposal > regarding the behavior of env.source(BoundedSource) and had an incorrect > impression about the behavior of java covariant return type. > Anyways, I agree what Dawid originally proposed makes sense, which is the > following API: > > // Return a BoundedDataStream instance if the source is bounded. > // Return a DataStream instance if the source is unbounded. > DataStream env.source(Source); > > // Throws exception if the source is unbounded. > // Used when users knows the source is bounded at programming time. > BoundedDataStream env.boundedSource(Source); > > A BoundedDataStream only runs in batch execution mode. > A DataStream only runs in streaming execution mode. > > To run a bounded source in streaming execution mode, one would do the > following: > > // Return a DataStream instance with a source that will stop at some point; > DataStream env.source(SourceUtils.asUnbounded(myBoundedSource)); > > I'll update the FLIP wiki and resume the vote if there is no further > concerns. > > Apologies for the misunderstanding and thanks for all the patient > discussions. > > Thanks, > > Jiangjie (Becket) Qin > > > > On Mon, Dec 23, 2019 at 8:00 AM Becket Qin <becket....@gmail.com> wrote: > > > Hi Steven, > > > > I think the current proposal is what you mentioned - a Kafka source that > > can be constructed in either BOUNDED or UNBOUNDED mode. And Flink can get > > the boundedness by invoking getBoundedness(). > > > > So one can create a Kafka source by doing something like the following: > > > > new KafkaSource().startOffset(),endOffset(); // A bounded instance. > > new KafkaSource().startOffset(); // An unbounded instance. > > > > If users want to have an UNBOUNDED Kafka source that stops at some point. > > They can wrap the BOUNDED Kafka source like below: > > > > SourceUtils.asUnbounded(new KafkaSource.startOffset().endOffset()); > > > > The wrapped source would be an unbounded Kafka source that stops at the > > end offset. > > > > Does that make sense? > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Fri, Dec 20, 2019 at 1:31 PM Jark Wu <imj...@gmail.com> wrote: > > > >> Hi, > >> > >> First of all, I think it is not called "UNBOUNDED", according to the > >> FLIP-27, it is called "CONTINUOUS_UNBOUNDED". > >> And from the description of the Boundedness in the FLIP-27[1] declares > >> clearly what Becket and I think. > >> > >> public enum Boundedness { > >> > >> /** > >> * A bounded source processes the data that is currently available > and > >> will end after that. > >> * > >> * <p>When a source produces a bounded stream, the runtime may > >> activate > >> additional optimizations > >> * that are suitable only for bounded input. Incorrectly producing > >> unbounded data when the source > >> * is set to produce a bounded stream will often result in programs > >> that do not output any results > >> * and may eventually fail due to runtime errors (out of memory or > >> storage). > >> */ > >> BOUNDED, > >> > >> /** > >> * A continuous unbounded source continuously processes all data as > it > >> comes. > >> * > >> * <p>The source may run forever (until the program is terminated) > or > >> might actually end at some point, > >> * based on some source-specific conditions. Because that is not > >> transparent to the runtime, > >> * the runtime will use an execution mode for continuous unbounded > >> streams whenever this mode > >> * is chosen. > >> */ > >> CONTINUOUS_UNBOUNDED > >> } > >> > >> Best, > >> Jark > >> > >> [1]: > >> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP-27:RefactorSourceInterface-Source > >> > >> > >> > >> On Fri, 20 Dec 2019 at 12:55, Steven Wu <stevenz...@gmail.com> wrote: > >> > >> > Becket, > >> > > >> > Regarding "UNBOUNDED source that stops at some point", I found it > >> difficult > >> > to grasp what UNBOUNDED really mean. > >> > > >> > If we want to use Kafka source with an end/stop time, I guess you call > >> it > >> > UNBOUNDED kafka source that stops (aka BOUNDED-streaming). The > >> > terminology is a little confusing to me. Maybe BOUNDED/UNBOUNDED > >> shouldn't > >> > be used to categorize source. Just call it Kafka source and it can run > >> in > >> > either BOUNDED or UNBOUNDED mode. > >> > > >> > Thanks, > >> > Steven > >> > > >> > On Thu, Dec 19, 2019 at 7:02 PM Becket Qin <becket....@gmail.com> > >> wrote: > >> > > >> > > I had an offline chat with Jark, and here are some more thoughts: > >> > > > >> > > 1. From SQL perspective, BOUNDED source leads to the batch execution > >> > mode, > >> > > UNBOUNDED source leads to the streaming execution mode. > >> > > 2. The semantic of UNBOUNDED source is may or may not stop. The > >> semantic > >> > of > >> > > BOUNDED source is will stop. > >> > > 3. The semantic of DataStream is may or may not terminate. The > >> semantic > >> > of > >> > > BoundedDataStream is will terminate. > >> > > > >> > > Given that, option 3 seems a better option because: > >> > > 1. SQL already has strict binding between Boundedness and execution > >> mode. > >> > > Letting DataStream be consistent would be good. > >> > > 2. The semantic of UNBOUNDED source is exactly the same as > >> DataStream. So > >> > > we should avoid breaking such semantic, i.e. turning some DataStream > >> from > >> > > "may or may not terminate" to "will terminate". > >> > > > >> > > For case where users want BOUNDED-streaming combination, they can > >> simply > >> > > use an UNBOUNDED source that stops at some point. We can even > provide > >> a > >> > > simple wrapper to wrap a BOUNDED source as an UNBOUNDED source if > that > >> > > helps. But API wise, option 3 seems telling a pretty good whole > story. > >> > > > >> > > Thanks, > >> > > > >> > > Jiangjie (Becket) Qin > >> > > > >> > > > >> > > > >> > > > >> > > On Thu, Dec 19, 2019 at 10:30 PM Becket Qin <becket....@gmail.com> > >> > wrote: > >> > > > >> > > > Hi Timo, > >> > > > > >> > > > Bounded is just a special case of unbounded and every bounded > source > >> > can > >> > > >> also be treated as an unbounded source. This would unify the API > if > >> > > >> people don't need a bounded operation. > >> > > > > >> > > > > >> > > > With option 3 users can still get a unified API with something > like > >> > > below: > >> > > > > >> > > > DataStream boundedStream = env.boundedSource(boundedSource); > >> > > > DataStream unboundedStream = env.source(unboundedSource); > >> > > > > >> > > > So in both cases, users can still use a unified DataStream without > >> > > > touching the bounded stream only methods. > >> > > > By "unify the API if people don't need the bounded operation". Do > >> you > >> > > > expect a DataStream with a Bounded source to have the batch > >> operators > >> > and > >> > > > scheduler settings as well? > >> > > > > >> > > > > >> > > > If we allow DataStream from BOUNDED source, we will essentially > pick > >> > > "*modified > >> > > > option 2*". > >> > > > > >> > > > // The source is either bounded or unbounded, but only unbounded > >> > > >> operations could be performed on the returned DataStream. > >> > > >> DataStream<Type> dataStream = env.source(someSource); > >> > > > > >> > > > > >> > > >> // The source must be a bounded source, otherwise exception is > >> thrown. > >> > > >> BoundedDataStream<Type> boundedDataStream = > >> > > >> env.boundedSource(boundedSource); > >> > > > > >> > > > > >> > > > > >> > > > // Add the following method to DataStream > >> > > > > >> > > > Boundedness DataStream#getBoundedness(); > >> > > > > >> > > > > >> > > > From pure logical perspective, Boundedness and runtime settings > >> > > > (Stream/Batch) are two orthogonal dimensions. And are specified in > >> the > >> > > > following way. > >> > > > > >> > > > *Boundedness* - defined by the source: BOUNDED / UNBOUNDED. > >> > > > *Running mode* - defined by the API class: DataStream (Streaming > >> mode) > >> > / > >> > > > BoundedDataStream (batch mode). > >> > > > > >> > > > Excluding the UNBOUNDED-batch combination, the "*modified option > 2"* > >> > > > covers the rest three combination. Compared with "*modified option > >> 2*", > >> > > > the main benefit of option 3 is its simplicity and clearness, by > >> tying > >> > > > boundedness to running mode and giving up BOUNDED-streaming > >> > combination. > >> > > > > >> > > > Just to be clear, I am fine with either option. But I would like > to > >> > > > understand a bit more about the bounded-streaming use case and > when > >> > users > >> > > > would prefer this over bounded-batch case, and whether the added > >> value > >> > > > justifies the additional complexity in the API. Two cases I can > >> think > >> > of > >> > > > are: > >> > > > 1. The records in DataStream will be processed in order, while > >> > > > BoundedDataStream processes records without order guarantee. > >> > > > 2. DataStream emits intermediate results when processing a finite > >> > > dataset, > >> > > > while BoundedDataStream only emit the final result. In any case, > it > >> > could > >> > > > be supported by an UNBOUNDED source stopping at some point. > >> > > > > >> > > > Case 1 is actually misleading because DataStream in general > doesn't > >> > > really > >> > > > support in-order process. > >> > > > Case 2 seems a rare use case because the instantaneous > intermediate > >> > > result > >> > > > seems difficult to reason about. In any case, this can be > supported > >> by > >> > an > >> > > > UNBOUNDED source that stops at some point. > >> > > > > >> > > > Is there other use cases for bounded-streaming combination I > >> missed? I > >> > am > >> > > > a little hesitating to put the testing requirement here because > >> ideally > >> > > I'd > >> > > > avoid having public APIs for testing purpose only. And this could > be > >> > > > resolved by having a UNBOUNDED source stopping at some point as > >> well. > >> > > > > >> > > > Sorry for the long discussion, but I would really like to make an > >> API > >> > > > decision after knowing all the pros and cons. > >> > > > > >> > > > Thanks, > >> > > > > >> > > > Jiangjie (Becket) Qin > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > On Thu, Dec 19, 2019 at 6:19 PM Timo Walther <twal...@apache.org> > >> > wrote: > >> > > > > >> > > >> Hi Becket, > >> > > >> > >> > > >> regarding *Option 3* I think we can relax the constraints for > >> > > >> env.source(): > >> > > >> > >> > > >> // MySource can be bounded or unbounded > >> > > >> DataStream<Type> dataStream = env.source(mySource); > >> > > >> > >> > > >> // MySource must be bounded, otherwise throws exception. > >> > > >> BoundedDataStream<Type> boundedDataStream = > >> > env.boundedSource(mySource); > >> > > >> > >> > > >> Bounded is just a special case of unbounded and every bounded > >> source > >> > can > >> > > >> also be treated as an unbounded source. This would unify the API > if > >> > > >> people don't need a bounded operation. It also addresses Jark's > >> > > concerns. > >> > > >> > >> > > >> Regards, > >> > > >> Timo > >> > > >> > >> > > >> > >> > > >> On 18.12.19 14:16, Becket Qin wrote: > >> > > >> > Hi Jark, > >> > > >> > > >> > > >> > Please see the reply below: > >> > > >> > > >> > > >> > Regarding to option#3, my concern is that if we don't support > >> > > streaming > >> > > >> >> mode for bounded source, > >> > > >> >> how could we create a testing source for streaming mode? > >> Currently, > >> > > >> all the > >> > > >> >> testing source for streaming > >> > > >> >> are bounded, so that the integration test will finish finally. > >> > > >> > > >> > > >> > > >> > > >> > An UNBOUNDED source does not mean it will never stops. It > simply > >> > > >> indicates > >> > > >> > that the source *may* run forever, so the runtime needs to be > >> > prepared > >> > > >> for > >> > > >> > that, but the task may still stop at some point when it hits > some > >> > > >> > source-specific condition. So an UNBOUNDED testing source can > >> still > >> > > >> stop at > >> > > >> > some point if needed. > >> > > >> > > >> > > >> > Regarding to Source#getRecordOrder(), could we have a implicit > >> > > contract > >> > > >> >> that unbounded source should > >> > > >> >> already read in order (i.e. reading partitions in parallel), > for > >> > > >> bounded > >> > > >> >> source the order is not mandatory. > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> >> This is also the behaviors of the current sources. > >> > > >> > > >> > > >> > 1) a source can't guarantee it reads in strict order, because > the > >> > > >> producer > >> > > >> >> may produce data not in order. > >> > > >> >> 2) *Bounded-StrictOrder* is not necessary, because batch can > >> > reorder > >> > > >> data. > >> > > >> > > >> > > >> > > >> > > >> > It is true that sometimes the source cannot guarantee the > record > >> > > order, > >> > > >> but > >> > > >> > sometimes it can. Right now, even for stream processing, there > >> is no > >> > > >> > processing order guarantee. For example, a join operator may > >> emit a > >> > > >> later > >> > > >> > record which successfully found a join match earlier. > >> > > >> > Event order is one of the most important requirements for event > >> > > >> processing, > >> > > >> > a clear order guarantee would be necessary. That said, I agree > >> that > >> > > >> right > >> > > >> > now even if the sources provide the record order requirement, > the > >> > > >> runtime > >> > > >> > is not able to guarantee that out of the box. So I am OK if we > >> add > >> > the > >> > > >> > record order to the Source later. But we should avoid > misleading > >> > users > >> > > >> to > >> > > >> > make them think the processing order is guaranteed when using > the > >> > > >> unbounded > >> > > >> > runtime. > >> > > >> > > >> > > >> > Thanks, > >> > > >> > > >> > > >> > Jiangjie (Becket) Qin > >> > > >> > > >> > > >> > On Wed, Dec 18, 2019 at 10:29 AM Jark Wu <imj...@gmail.com> > >> wrote: > >> > > >> > > >> > > >> >> Hi Becket, > >> > > >> >> > >> > > >> >> That's great we have reached a consensus on > >> > Source#getBoundedness(). > >> > > >> >> > >> > > >> >> Regarding to option#3, my concern is that if we don't support > >> > > streaming > >> > > >> >> mode for bounded source, > >> > > >> >> how could we create a testing source for streaming mode? > >> Currently, > >> > > >> all the > >> > > >> >> testing source for streaming > >> > > >> >> are bounded, so that the integration test will finish finally. > >> > > >> >> > >> > > >> >> Regarding to Source#getRecordOrder(), could we have a implicit > >> > > contract > >> > > >> >> that unbounded source should > >> > > >> >> already read in order (i.e. reading partitions in parallel), > for > >> > > >> bounded > >> > > >> >> source the order is not mandatory. > >> > > >> >> This is also the behaviors of the current sources. > >> > > >> >> 1) a source can't guarantee it reads in strict order, because > >> the > >> > > >> producer > >> > > >> >> may produce data not in order. > >> > > >> >> 2) *Bounded-StrictOrder* is not necessary, because batch can > >> > reorder > >> > > >> data. > >> > > >> >> > >> > > >> >> Best, > >> > > >> >> Jark > >> > > >> >> > >> > > >> >> > >> > > >> >> > >> > > >> >> On Tue, 17 Dec 2019 at 22:03, Becket Qin < > becket....@gmail.com> > >> > > wrote: > >> > > >> >> > >> > > >> >>> Hi folks, > >> > > >> >>> > >> > > >> >>> Thanks for the comments. I am convinced that the Source API > >> should > >> > > not > >> > > >> >> take > >> > > >> >>> boundedness as a parameter after it is constructed. What Timo > >> and > >> > > >> Dawid > >> > > >> >>> suggested sounds a reasonable solution to me. So the Source > API > >> > > would > >> > > >> >>> become: > >> > > >> >>> > >> > > >> >>> Source { > >> > > >> >>> Boundedness getBoundedness(); > >> > > >> >>> } > >> > > >> >>> > >> > > >> >>> Assuming the above Source API, in addition to the two options > >> > > >> mentioned > >> > > >> >> in > >> > > >> >>> earlier emails, I am thinking of another option: > >> > > >> >>> > >> > > >> >>> *Option 3:* > >> > > >> >>> // MySource must be unbounded, otherwise throws exception. > >> > > >> >>> DataStream<Type> dataStream = env.source(mySource); > >> > > >> >>> > >> > > >> >>> // MySource must be bounded, otherwise throws exception. > >> > > >> >>> BoundedDataStream<Type> boundedDataStream = > >> > > >> env.boundedSource(mySource); > >> > > >> >>> > >> > > >> >>> The pros of this API are: > >> > > >> >>> a) It fits the requirements from Table / SQL well. > >> > > >> >>> b) DataStream users still have type safety (option 2 only > >> has > >> > > >> partial > >> > > >> >>> type safety). > >> > > >> >>> c) Cristal clear boundedness from the API which makes > >> > DataStream > >> > > >> join > >> > > >> >> / > >> > > >> >>> connect easy to reason about. > >> > > >> >>> The caveats I see, > >> > > >> >>> a) It is inconsistent with Table since Table has one > >> unified > >> > > >> >> interface. > >> > > >> >>> b) No streaming mode for bounded source. > >> > > >> >>> > >> > > >> >>> @Stephan Ewen <ewenstep...@gmail.com> @Aljoscha Krettek > >> > > >> >>> <aljos...@ververica.com> what do you think of the approach? > >> > > >> >>> > >> > > >> >>> > >> > > >> >>> Orthogonal to the above API, I am wondering whether > >> boundedness is > >> > > the > >> > > >> >> only > >> > > >> >>> dimension needed to describe the characteristic of the Source > >> > > >> behavior. > >> > > >> >> We > >> > > >> >>> may also need to have another dimension of *record order*. > >> > > >> >>> > >> > > >> >>> For example, when a file source is reading from a directory > >> with > >> > > >> bounded > >> > > >> >>> records, it may have two ways to read. > >> > > >> >>> 1. Read files in parallel. > >> > > >> >>> 2. Read files in the chronological order. > >> > > >> >>> In both cases, the file source is a Bounded Source. However, > >> the > >> > > >> >> processing > >> > > >> >>> requirement for downstream may be different. In the first > case, > >> > the > >> > > >> >>> record processing and result emitting order does not matter, > >> e.g. > >> > > word > >> > > >> >>> count. In the second case, the records may have to be > >> processed in > >> > > the > >> > > >> >>> order they were read, e.g. change log processing. > >> > > >> >>> > >> > > >> >>> If the Source only has a getBoundedness() method, the > >> downstream > >> > > >> >> processors > >> > > >> >>> would not know whether the records emitted from the Source > >> should > >> > be > >> > > >> >>> processed in order or not. So combining the boundedness and > >> record > >> > > >> order, > >> > > >> >>> we will have four scenarios: > >> > > >> >>> > >> > > >> >>> *Bounded-StrictOrder*: A segment of change log. > >> > > >> >>> *Bounded-Random*: Batch Word Count. > >> > > >> >>> *Unbounded-StrictOrder*: An infinite change log. > >> > > >> >>> *Unbounded-Random*: Streaming Word Count. > >> > > >> >>> > >> > > >> >>> Option 2 mentioned in the previous email was kind of trying > to > >> > > handle > >> > > >> the > >> > > >> >>> Bounded-StrictOrder case by creating a DataStream from a > >> bounded > >> > > >> source, > >> > > >> >>> which actually does not work. > >> > > >> >>> It looks that we do not have strict order support in some > >> > operators > >> > > at > >> > > >> >> this > >> > > >> >>> point, e.g. join. But we may still want to add the semantic > to > >> the > >> > > >> Source > >> > > >> >>> first so later on we don't need to change all the source > >> > > >> implementations, > >> > > >> >>> especially given that many of them will be implemented by 3rd > >> > party. > >> > > >> >>> > >> > > >> >>> Given that, we need another dimension of *Record Order* in > the > >> > > Source. > >> > > >> >> More > >> > > >> >>> specifically, the API would become: > >> > > >> >>> > >> > > >> >>> Source { > >> > > >> >>> Boundedness getBoundedness(); > >> > > >> >>> RecordOrder getRecordOrder(); > >> > > >> >>> } > >> > > >> >>> > >> > > >> >>> public enum RecordOrder { > >> > > >> >>> /** The record in the DataStream must be processed in > its > >> > > strict > >> > > >> >> order > >> > > >> >>> for correctness. */ > >> > > >> >>> STRICT, > >> > > >> >>> /** The record in the DataStream can be processed in > >> > arbitrary > >> > > >> order. > >> > > >> >>> */ > >> > > >> >>> RANDOM; > >> > > >> >>> } > >> > > >> >>> > >> > > >> >>> Any thoughts? > >> > > >> >>> > >> > > >> >>> Thanks, > >> > > >> >>> > >> > > >> >>> Jiangjie (Becket) Qin > >> > > >> >>> > >> > > >> >>> On Tue, Dec 17, 2019 at 3:44 PM Timo Walther < > >> twal...@apache.org> > >> > > >> wrote: > >> > > >> >>> > >> > > >> >>>> Hi Becket, > >> > > >> >>>> > >> > > >> >>>> I completely agree with Dawid's suggestion. The information > >> about > >> > > the > >> > > >> >>>> boundedness should come out of the source. Because most of > the > >> > > >> >> streaming > >> > > >> >>>> sources can be made bounded based on some connector specific > >> > > >> criterion. > >> > > >> >>>> In Kafka, it would be an end offset or end timestamp but in > >> any > >> > > case > >> > > >> >>>> having just a env.boundedSource() is not enough because > >> > parameters > >> > > >> for > >> > > >> >>>> making the source bounded are missing. > >> > > >> >>>> > >> > > >> >>>> I suggest to have a simple `isBounded(): Boolean` flag in > >> every > >> > > >> source > >> > > >> >>>> that might be influenced by a connector builder as Dawid > >> > mentioned. > >> > > >> >>>> > >> > > >> >>>> For type safety during programming, we can still go with > >> *Final > >> > > state > >> > > >> >>>> 1*. By having a env.source() vs env.boundedSource(). The > >> latter > >> > > would > >> > > >> >>>> just enforce that the boolean flag is set to `true` and > could > >> > make > >> > > >> >>>> bounded operations available (if we need that actually). > >> > > >> >>>> > >> > > >> >>>> However, I don't think that we should start making a unified > >> > Table > >> > > >> API > >> > > >> >>>> ununified again. Boundedness is an optimization property. > >> Every > >> > > >> bounded > >> > > >> >>>> operation can also executed in an unbounded way using > >> > > >> >> updates/retraction > >> > > >> >>>> or watermarks. > >> > > >> >>>> > >> > > >> >>>> Regards, > >> > > >> >>>> Timo > >> > > >> >>>> > >> > > >> >>>> > >> > > >> >>>> On 15.12.19 14:22, Becket Qin wrote: > >> > > >> >>>>> Hi Dawid and Jark, > >> > > >> >>>>> > >> > > >> >>>>> I think the discussion ultimately boils down to the > question > >> > that > >> > > >> >> which > >> > > >> >>>> one > >> > > >> >>>>> of the following two final states do we want? Once we make > >> this > >> > > >> >>> decision, > >> > > >> >>>>> everything else can be naturally derived. > >> > > >> >>>>> > >> > > >> >>>>> *Final state 1*: Separate API for bounded / unbounded > >> > DataStream & > >> > > >> >>> Table. > >> > > >> >>>>> That means any code users write will be valid at the point > >> when > >> > > they > >> > > >> >>>> write > >> > > >> >>>>> the code. This is similar to having type safety check at > >> > > programming > >> > > >> >>>> time. > >> > > >> >>>>> For example, > >> > > >> >>>>> > >> > > >> >>>>> BoundedDataStream extends DataStream { > >> > > >> >>>>> // Operations only available for bounded data. > >> > > >> >>>>> BoundedDataStream sort(...); > >> > > >> >>>>> > >> > > >> >>>>> // Interaction with another BoundedStream returns a Bounded > >> > > stream. > >> > > >> >>>>> BoundedJoinedDataStream join(BoundedDataStream other) > >> > > >> >>>>> > >> > > >> >>>>> // Interaction with another unbounded stream returns an > >> > unbounded > >> > > >> >>> stream. > >> > > >> >>>>> JoinedDataStream join(DataStream other) > >> > > >> >>>>> } > >> > > >> >>>>> > >> > > >> >>>>> BoundedTable extends Table { > >> > > >> >>>>> // Bounded only operation. > >> > > >> >>>>> BoundedTable sort(...); > >> > > >> >>>>> > >> > > >> >>>>> // Interaction with another BoundedTable returns a > >> BoundedTable. > >> > > >> >>>>> BoundedTable join(BoundedTable other) > >> > > >> >>>>> > >> > > >> >>>>> // Interaction with another unbounded table returns an > >> unbounded > >> > > >> >> table. > >> > > >> >>>>> Table join(Table other) > >> > > >> >>>>> } > >> > > >> >>>>> > >> > > >> >>>>> *Final state 2*: One unified API for bounded / unbounded > >> > > DataStream > >> > > >> / > >> > > >> >>>>> Table. > >> > > >> >>>>> That unified API may throw exception at DAG compilation > time > >> if > >> > an > >> > > >> >>>> invalid > >> > > >> >>>>> operation is tried. This is what Table API currently > follows. > >> > > >> >>>>> > >> > > >> >>>>> DataStream { > >> > > >> >>>>> // Throws exception if the DataStream is unbounded. > >> > > >> >>>>> DataStream sort(); > >> > > >> >>>>> // Get boundedness. > >> > > >> >>>>> Boundedness getBoundedness(); > >> > > >> >>>>> } > >> > > >> >>>>> > >> > > >> >>>>> Table { > >> > > >> >>>>> // Throws exception if the table has infinite rows. > >> > > >> >>>>> Table orderBy(); > >> > > >> >>>>> > >> > > >> >>>>> // Get boundedness. > >> > > >> >>>>> Boundedness getBoundedness(); > >> > > >> >>>>> } > >> > > >> >>>>> > >> > > >> >>>>> >From what I understand, there is no consensus so far on > this > >> > > >> decision > >> > > >> >>>> yet. > >> > > >> >>>>> Whichever final state we choose, we need to make it > >> consistent > >> > > >> across > >> > > >> >>> the > >> > > >> >>>>> entire project. We should avoid the case that Table follows > >> one > >> > > >> final > >> > > >> >>>> state > >> > > >> >>>>> while DataStream follows another. Some arguments I am aware > >> of > >> > > from > >> > > >> >>> both > >> > > >> >>>>> sides so far are following: > >> > > >> >>>>> > >> > > >> >>>>> Arguments for final state 1: > >> > > >> >>>>> 1a) Clean API with method safety check at programming time. > >> > > >> >>>>> 1b) (Counter 2b) Although SQL does not have programming > time > >> > error > >> > > >> >>>> check, SQL > >> > > >> >>>>> is not really a "programming language" per se. So SQL can > be > >> > > >> >> different > >> > > >> >>>> from > >> > > >> >>>>> Table and DataStream. > >> > > >> >>>>> 1c) Although final state 2 seems making it easier for SQL > to > >> > use > >> > > >> >> given > >> > > >> >>>> it > >> > > >> >>>>> is more "config based" than "parameter based", final state > 1 > >> can > >> > > >> >>> probably > >> > > >> >>>>> also meet what SQL wants by wrapping the Source in > >> TableSource / > >> > > >> >>>>> TableSourceFactory API if needed. > >> > > >> >>>>> > >> > > >> >>>>> Arguments for final state 2: > >> > > >> >>>>> 2a) The Source API itself seems already sort of following > the > >> > > >> unified > >> > > >> >>> API > >> > > >> >>>>> pattern. > >> > > >> >>>>> 2b) There is no "programming time" method error check in > SQL > >> > case, > >> > > >> so > >> > > >> >>> we > >> > > >> >>>>> cannot really achieve final state 1 across the board. > >> > > >> >>>>> 2c) It is an easier path given our current status, i.e. > >> Table is > >> > > >> >>> already > >> > > >> >>>>> following final state 2. > >> > > >> >>>>> 2d) Users can always explicitly check the boundedness if > they > >> > want > >> > > >> >> to. > >> > > >> >>>>> > >> > > >> >>>>> As I mentioned earlier, my initial thought was also to > have a > >> > > >> >>>>> "configuration based" Source rather than a "parameter > based" > >> > > Source. > >> > > >> >> So > >> > > >> >>>> it > >> > > >> >>>>> is completely possible that I missed some important > >> > consideration > >> > > or > >> > > >> >>>> design > >> > > >> >>>>> principles that we want to enforce for the project. It > would > >> be > >> > > good > >> > > >> >>>>> if @Stephan > >> > > >> >>>>> Ewen <step...@ververica.com> and @Aljoscha Krettek < > >> > > >> >>>> aljos...@ververica.com> can > >> > > >> >>>>> also provide more thoughts on this. > >> > > >> >>>>> > >> > > >> >>>>> > >> > > >> >>>>> Re: Jingsong > >> > > >> >>>>> > >> > > >> >>>>> As you said, there are some batched system source, like > >> > > parquet/orc > >> > > >> >>>> source. > >> > > >> >>>>>> Could we have the batch emit interface to improve > >> performance? > >> > > The > >> > > >> >>>> queue of > >> > > >> >>>>>> per record may cause performance degradation. > >> > > >> >>>>> > >> > > >> >>>>> > >> > > >> >>>>> The current interface does not necessarily cause > performance > >> > > problem > >> > > >> >>> in a > >> > > >> >>>>> multi-threading case. In fact, the base implementation > allows > >> > > >> >>>> SplitReaders > >> > > >> >>>>> to add a batch <E> of records<T> to the records queue<E>, > so > >> > each > >> > > >> >>> element > >> > > >> >>>>> in the records queue would be a batch <E>. In this case, > when > >> > the > >> > > >> >> main > >> > > >> >>>>> thread polls records, it will take a batch <E> of records > <T> > >> > from > >> > > >> >> the > >> > > >> >>>>> shared records queue and process the records <T> in a batch > >> > > manner. > >> > > >> >>>>> > >> > > >> >>>>> Thanks, > >> > > >> >>>>> > >> > > >> >>>>> Jiangjie (Becket) Qin > >> > > >> >>>>> > >> > > >> >>>>> On Thu, Dec 12, 2019 at 1:29 PM Jingsong Li < > >> > > jingsongl...@gmail.com > >> > > >> > > >> > > >> >>>> wrote: > >> > > >> >>>>> > >> > > >> >>>>>> Hi Becket, > >> > > >> >>>>>> > >> > > >> >>>>>> I also have some performance concerns too. > >> > > >> >>>>>> > >> > > >> >>>>>> If I understand correctly, SourceOutput will emit data per > >> > record > >> > > >> >> into > >> > > >> >>>> the > >> > > >> >>>>>> queue? I'm worried about the multithreading performance of > >> this > >> > > >> >> queue. > >> > > >> >>>>>> > >> > > >> >>>>>>> One example is some batched messaging systems which only > >> have > >> > an > >> > > >> >>> offset > >> > > >> >>>>>> for the entire batch instead of individual messages in the > >> > batch. > >> > > >> >>>>>> > >> > > >> >>>>>> As you said, there are some batched system source, like > >> > > parquet/orc > >> > > >> >>>> source. > >> > > >> >>>>>> Could we have the batch emit interface to improve > >> performance? > >> > > The > >> > > >> >>>> queue of > >> > > >> >>>>>> per record may cause performance degradation. > >> > > >> >>>>>> > >> > > >> >>>>>> Best, > >> > > >> >>>>>> Jingsong Lee > >> > > >> >>>>>> > >> > > >> >>>>>> On Thu, Dec 12, 2019 at 9:15 AM Jark Wu <imj...@gmail.com > > > >> > > wrote: > >> > > >> >>>>>> > >> > > >> >>>>>>> Hi Becket, > >> > > >> >>>>>>> > >> > > >> >>>>>>> I think Dawid explained things clearly and makes a lot of > >> > sense. > >> > > >> >>>>>>> I'm also in favor of #2, because #1 doesn't work for our > >> > future > >> > > >> >>> unified > >> > > >> >>>>>>> envrionment. > >> > > >> >>>>>>> > >> > > >> >>>>>>> You can see the vision in this documentation [1]. In the > >> > future, > >> > > >> we > >> > > >> >>>> would > >> > > >> >>>>>>> like to > >> > > >> >>>>>>> drop the global streaming/batch mode in SQL (i.e. > >> > > >> >>>>>>> EnvironmentSettings#inStreamingMode/inBatchMode). > >> > > >> >>>>>>> A source is bounded or unbounded once defined, so queries > >> can > >> > be > >> > > >> >>>> inferred > >> > > >> >>>>>>> from source to run > >> > > >> >>>>>>> in streaming or batch or hybrid mode. However, in #1, we > >> will > >> > > lose > >> > > >> >>> this > >> > > >> >>>>>>> ability because the framework > >> > > >> >>>>>>> doesn't know whether the source is bounded or unbounded. > >> > > >> >>>>>>> > >> > > >> >>>>>>> Best, > >> > > >> >>>>>>> Jark > >> > > >> >>>>>>> > >> > > >> >>>>>>> > >> > > >> >>>>>>> [1]: > >> > > >> >>>>>>> > >> > > >> >>>>>>> > >> > > >> >>>>>> > >> > > >> >>>> > >> > > >> >>> > >> > > >> >> > >> > > >> > >> > > > >> > > >> > https://docs.google.com/document/d/1yrKXEIRATfxHJJ0K3t6wUgXAtZq8D-XgvEnvl2uUcr0/edit#heading=h.v4ib17buma1p > >> > > >> >>>>>>> > >> > > >> >>>>>>> On Wed, 11 Dec 2019 at 20:52, Piotr Nowojski < > >> > > pi...@ververica.com > >> > > >> > > >> > > >> >>>>>> wrote: > >> > > >> >>>>>>> > >> > > >> >>>>>>>> Hi, > >> > > >> >>>>>>>> > >> > > >> >>>>>>>> Regarding the: > >> > > >> >>>>>>>> > >> > > >> >>>>>>>> Collection<E> getNextRecords() > >> > > >> >>>>>>>> > >> > > >> >>>>>>>> I’m pretty sure such design would unfortunately impact > the > >> > > >> >>> performance > >> > > >> >>>>>>>> (accessing and potentially creating the collection on > the > >> hot > >> > > >> >> path). > >> > > >> >>>>>>>> > >> > > >> >>>>>>>> Also the > >> > > >> >>>>>>>> > >> > > >> >>>>>>>> InputStatus emitNext(DataOutput<T> output) throws > >> Exception; > >> > > >> >>>>>>>> or > >> > > >> >>>>>>>> Status pollNext(SourceOutput<T> sourceOutput) throws > >> > Exception; > >> > > >> >>>>>>>> > >> > > >> >>>>>>>> Gives us some opportunities in the future, to allow > Source > >> > hot > >> > > >> >>> looping > >> > > >> >>>>>>>> inside, until it receives some signal “please exit > >> because of > >> > > >> some > >> > > >> >>>>>>> reasons” > >> > > >> >>>>>>>> (output collector could return such hint upon collecting > >> the > >> > > >> >>> result). > >> > > >> >>>>>> But > >> > > >> >>>>>>>> that’s another topic outside of this FLIP’s scope. > >> > > >> >>>>>>>> > >> > > >> >>>>>>>> Piotrek > >> > > >> >>>>>>>> > >> > > >> >>>>>>>>> On 11 Dec 2019, at 10:41, Till Rohrmann < > >> > trohrm...@apache.org > >> > > > > >> > > >> >>>>>> wrote: > >> > > >> >>>>>>>>> > >> > > >> >>>>>>>>> Hi Becket, > >> > > >> >>>>>>>>> > >> > > >> >>>>>>>>> quick clarification from my side because I think you > >> > > >> >> misunderstood > >> > > >> >>> my > >> > > >> >>>>>>>>> question. I did not suggest to let the SourceReader > >> return > >> > > only > >> > > >> a > >> > > >> >>>>>>> single > >> > > >> >>>>>>>>> record at a time when calling getNextRecords. As the > >> return > >> > > type > >> > > >> >>>>>>>> indicates, > >> > > >> >>>>>>>>> the method can return an arbitrary number of records. > >> > > >> >>>>>>>>> > >> > > >> >>>>>>>>> Cheers, > >> > > >> >>>>>>>>> Till > >> > > >> >>>>>>>>> > >> > > >> >>>>>>>>> On Wed, Dec 11, 2019 at 10:13 AM Dawid Wysakowicz < > >> > > >> >>>>>>>> dwysakow...@apache.org <mailto:dwysakow...@apache.org>> > >> > > >> >>>>>>>>> wrote: > >> > > >> >>>>>>>>> > >> > > >> >>>>>>>>>> Hi Becket, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Issue #1 - Design of Source interface > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> I mentioned the lack of a method like > >> > > >> >>>>>>>> Source#createEnumerator(Boundedness > >> > > >> >>>>>>>>>> boundedness, SplitEnumeratorContext context), because > >> > without > >> > > >> >> the > >> > > >> >>>>>>>> current > >> > > >> >>>>>>>>>> proposal is not complete/does not work. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> If we say that boundedness is an intrinsic property > of a > >> > > source > >> > > >> >>> imo > >> > > >> >>>>>> we > >> > > >> >>>>>>>>>> don't need the Source#createEnumerator(Boundedness > >> > > boundedness, > >> > > >> >>>>>>>>>> SplitEnumeratorContext context) method. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Assuming a source from my previous example: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Source source = KafkaSource.builder() > >> > > >> >>>>>>>>>> ... > >> > > >> >>>>>>>>>> .untilTimestamp(...) > >> > > >> >>>>>>>>>> .build() > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Would the enumerator differ if created like > >> > > >> >>>>>>>>>> source.createEnumerator(CONTINUOUS_UNBOUNDED, ...) vs > >> > source > >> > > >> >>>>>>>>>> .createEnumerator(BOUNDED, ...)? I know I am repeating > >> > > myself, > >> > > >> >> but > >> > > >> >>>>>>> this > >> > > >> >>>>>>>> is > >> > > >> >>>>>>>>>> the part that my opinion differ the most from the > >> current > >> > > >> >>> proposal. > >> > > >> >>>>>> I > >> > > >> >>>>>>>>>> really think it should always be the source that tells > >> if > >> > it > >> > > is > >> > > >> >>>>>>> bounded > >> > > >> >>>>>>>> or > >> > > >> >>>>>>>>>> not. In the current proposal methods > >> > > >> >> continousSource/boundedSource > >> > > >> >>>>>>>> somewhat > >> > > >> >>>>>>>>>> reconfigure the source, which I think is misleading. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> I think a call like: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Source source = KafkaSource.builder() > >> > > >> >>>>>>>>>> ... > >> > > >> >>>>>>>>>> .readContinously() / readUntilLatestOffset() / > >> > > >> >>> readUntilTimestamp > >> > > >> >>>> / > >> > > >> >>>>>>>> readUntilOffsets / ... > >> > > >> >>>>>>>>>> .build() > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> is way cleaner (and expressive) than > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Source source = KafkaSource.builder() > >> > > >> >>>>>>>>>> ... > >> > > >> >>>>>>>>>> .build() > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> env.continousSource(source) // which actually > underneath > >> > > would > >> > > >> >>> call > >> > > >> >>>>>>>> createEnumerator(CONTINUOUS, ctx) which would be > >> equivalent > >> > to > >> > > >> >>>>>>>> source.readContinously().createEnumerator(ctx) > >> > > >> >>>>>>>>>> // or > >> > > >> >>>>>>>>>> env.boundedSource(source) // which actually underneath > >> > would > >> > > >> >> call > >> > > >> >>>>>>>> createEnumerator(BOUNDED, ctx) which would be equivalent > >> to > >> > > >> >>>>>>>> source.readUntilLatestOffset().createEnumerator(ctx) > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Sorry for the comparison, but to me it seems there is > >> too > >> > > much > >> > > >> >>> magic > >> > > >> >>>>>>>>>> happening underneath those two calls. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> I really believe the Source interface should have > >> > > >> getBoundedness > >> > > >> >>>>>>> method > >> > > >> >>>>>>>>>> instead of (supportBoundedness) + > >> > > createEnumerator(Boundedness, > >> > > >> >>> ...) > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Issue #2 - Design of > >> > > >> >>>>>>>>>> > >> > > >> ExecutionEnvironment#source()/continuousSource()/boundedSource() > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> As you might have guessed I am slightly in favor of > >> option > >> > #2 > >> > > >> >>>>>>> modified. > >> > > >> >>>>>>>>>> Yes I am aware every step of the dag would have to be > >> able > >> > to > >> > > >> >> say > >> > > >> >>> if > >> > > >> >>>>>>> it > >> > > >> >>>>>>>> is > >> > > >> >>>>>>>>>> bounded or not. I have a feeling it would be easier to > >> > > express > >> > > >> >>> cross > >> > > >> >>>>>>>>>> bounded/unbounded operations, but I must admit I have > >> not > >> > > >> >> thought > >> > > >> >>> it > >> > > >> >>>>>>>>>> through thoroughly, In the spirit of batch is just a > >> > special > >> > > >> >> case > >> > > >> >>> of > >> > > >> >>>>>>>>>> streaming I thought BoundedStream would extend from > >> > > DataStream. > >> > > >> >>>>>>> Correct > >> > > >> >>>>>>>> me > >> > > >> >>>>>>>>>> if I am wrong. In such a setup the cross > >> bounded/unbounded > >> > > >> >>> operation > >> > > >> >>>>>>>> could > >> > > >> >>>>>>>>>> be expressed quite easily I think: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> DataStream { > >> > > >> >>>>>>>>>> DataStream join(DataStream, ...); // we could not > >> really > >> > > >> tell > >> > > >> >> if > >> > > >> >>>>>> the > >> > > >> >>>>>>>> result is bounded or not, but because bounded stream is > a > >> > > special > >> > > >> >>> case > >> > > >> >>>>>> of > >> > > >> >>>>>>>> unbounded the API object is correct, irrespective if the > >> left > >> > > or > >> > > >> >>> right > >> > > >> >>>>>>> side > >> > > >> >>>>>>>> of the join is bounded > >> > > >> >>>>>>>>>> } > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> BoundedStream extends DataStream { > >> > > >> >>>>>>>>>> BoundedStream join(BoundedStream, ...); // only if > >> both > >> > > >> sides > >> > > >> >>> are > >> > > >> >>>>>>>> bounded the result can be bounded as well. However we do > >> have > >> > > >> >> access > >> > > >> >>>> to > >> > > >> >>>>>>> the > >> > > >> >>>>>>>> DataStream#join here, so you can still join with a > >> DataStream > >> > > >> >>>>>>>>>> } > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> On the other hand I also see benefits of two > completely > >> > > >> >> disjointed > >> > > >> >>>>>>> APIs, > >> > > >> >>>>>>>>>> as we could prohibit some streaming calls in the > bounded > >> > > API. I > >> > > >> >>>>>> can't > >> > > >> >>>>>>>> think > >> > > >> >>>>>>>>>> of any unbounded operators that could not be > implemented > >> > for > >> > > >> >>> bounded > >> > > >> >>>>>>>> stream. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Besides I think we both agree we don't like the > method: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> DataStream boundedStream(Source) > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> suggested in the current state of the FLIP. Do we ? :) > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Best, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Dawid > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> On 10/12/2019 18:57, Becket Qin wrote: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Hi folks, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Thanks for the discussion, great feedback. Also thanks > >> > Dawid > >> > > >> for > >> > > >> >>> the > >> > > >> >>>>>>>>>> explanation, it is much clearer now. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> One thing that is indeed missing from the FLIP is how > >> the > >> > > >> >>>>>> boundedness > >> > > >> >>>>>>> is > >> > > >> >>>>>>>>>> passed to the Source implementation. So the API should > >> be > >> > > >> >>>>>>>>>> Source#createEnumerator(Boundedness boundedness, > >> > > >> >>>>>>> SplitEnumeratorContext > >> > > >> >>>>>>>>>> context) > >> > > >> >>>>>>>>>> And we can probably remove the > >> > > >> >>> Source#supportBoundedness(Boundedness > >> > > >> >>>>>>>>>> boundedness) method. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Assuming we have that, we are essentially choosing > from > >> one > >> > > of > >> > > >> >> the > >> > > >> >>>>>>>>>> following two options: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Option 1: > >> > > >> >>>>>>>>>> // The source is continuous source, and only unbounded > >> > > >> >> operations > >> > > >> >>>>>> can > >> > > >> >>>>>>> be > >> > > >> >>>>>>>>>> performed. > >> > > >> >>>>>>>>>> DataStream<Type> datastream = > >> > > env.continuousSource(someSource); > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> // The source is bounded source, both bounded and > >> unbounded > >> > > >> >>>>>> operations > >> > > >> >>>>>>>> can > >> > > >> >>>>>>>>>> be performed. > >> > > >> >>>>>>>>>> BoundedDataStream<Type> boundedDataStream = > >> > > >> >>>>>>>> env.boundedSource(someSource); > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - Pros: > >> > > >> >>>>>>>>>> a) explicit boundary between bounded / > unbounded > >> > > >> streams, > >> > > >> >>> it > >> > > >> >>>>>> is > >> > > >> >>>>>>>>>> quite simple and clear to the users. > >> > > >> >>>>>>>>>> - Cons: > >> > > >> >>>>>>>>>> a) For applications that do not involve > bounded > >> > > >> >> operations, > >> > > >> >>>>>> they > >> > > >> >>>>>>>>>> still have to call different API to distinguish > bounded > >> / > >> > > >> >>> unbounded > >> > > >> >>>>>>>> streams. > >> > > >> >>>>>>>>>> b) No support for bounded stream to run in a > >> > > streaming > >> > > >> >>>> runtime > >> > > >> >>>>>>>>>> setting, i.e. scheduling and operators behaviors. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Option 2: > >> > > >> >>>>>>>>>> // The source is either bounded or unbounded, but only > >> > > >> unbounded > >> > > >> >>>>>>>> operations > >> > > >> >>>>>>>>>> could be performed on the returned DataStream. > >> > > >> >>>>>>>>>> DataStream<Type> dataStream = env.source(someSource); > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> // The source must be a bounded source, otherwise > >> exception > >> > > is > >> > > >> >>>>>> thrown. > >> > > >> >>>>>>>>>> BoundedDataStream<Type> boundedDataStream = > >> > > >> >>>>>>>>>> env.boundedSource(boundedSource); > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> The pros and cons are exactly the opposite of option > 1. > >> > > >> >>>>>>>>>> - Pros: > >> > > >> >>>>>>>>>> a) For applications that do not involve > bounded > >> > > >> >> operations, > >> > > >> >>>>>> they > >> > > >> >>>>>>>>>> still have to call different API to distinguish > bounded > >> / > >> > > >> >>> unbounded > >> > > >> >>>>>>>> streams. > >> > > >> >>>>>>>>>> b) Support for bounded stream to run in a > >> streaming > >> > > >> >> runtime > >> > > >> >>>>>>>> setting, > >> > > >> >>>>>>>>>> i.e. scheduling and operators behaviors. > >> > > >> >>>>>>>>>> - Cons: > >> > > >> >>>>>>>>>> a) Bounded / unbounded streams are kind of > >> mixed, > >> > > i.e. > >> > > >> >>> given > >> > > >> >>>> a > >> > > >> >>>>>>>>>> DataStream, it is not clear whether it is bounded or > >> not, > >> > > >> unless > >> > > >> >>> you > >> > > >> >>>>>>>> have > >> > > >> >>>>>>>>>> the access to its source. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> If we only think from the Source API perspective, > >> option 2 > >> > > >> >> seems a > >> > > >> >>>>>>>> better > >> > > >> >>>>>>>>>> choice because functionality wise it is a superset of > >> > option > >> > > 1, > >> > > >> >> at > >> > > >> >>>>>> the > >> > > >> >>>>>>>> cost > >> > > >> >>>>>>>>>> of some seemingly acceptable ambiguity in the > DataStream > >> > API. > >> > > >> >>>>>>>>>> But if we look at the DataStream API as a whole, > option > >> 1 > >> > > seems > >> > > >> >> a > >> > > >> >>>>>>>> clearer > >> > > >> >>>>>>>>>> choice. For example, some times a library may have to > >> know > >> > > >> >>> whether a > >> > > >> >>>>>>>>>> certain task will finish or not. And it would be > >> difficult > >> > to > >> > > >> >> tell > >> > > >> >>>>>> if > >> > > >> >>>>>>>> the > >> > > >> >>>>>>>>>> input is a DataStream, unless additional information > is > >> > > >> provided > >> > > >> >>> all > >> > > >> >>>>>>> the > >> > > >> >>>>>>>>>> way from the Source. One possible solution is to have > a > >> > > >> >> *modified > >> > > >> >>>>>>>> option 2* > >> > > >> >>>>>>>>>> which adds a method to the DataStream API to indicate > >> > > >> >> boundedness, > >> > > >> >>>>>>> such > >> > > >> >>>>>>>> as > >> > > >> >>>>>>>>>> getBoundedness(). It would solve the problem with a > >> > potential > >> > > >> >>>>>>> confusion > >> > > >> >>>>>>>> of > >> > > >> >>>>>>>>>> what is difference between a DataStream with > >> > > >> >> getBoundedness()=true > >> > > >> >>>>>>> and a > >> > > >> >>>>>>>>>> BoundedDataStream. But that seems not super difficult > to > >> > > >> >> explain. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> So from API's perspective, I don't have a strong > opinion > >> > > >> between > >> > > >> >>>>>>>> *option 1* > >> > > >> >>>>>>>>>> and *modified option 2. *I like the cleanness of > option > >> 1, > >> > > but > >> > > >> >>>>>>> modified > >> > > >> >>>>>>>>>> option 2 would be more attractive if we have concrete > >> use > >> > > case > >> > > >> >> for > >> > > >> >>>>>> the > >> > > >> >>>>>>>>>> "Bounded stream with unbounded streaming runtime > >> settings". > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Re: Till > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Maybe this has already been asked before but I was > >> > wondering > >> > > >> why > >> > > >> >>> the > >> > > >> >>>>>>>>>> SourceReader interface has the method pollNext which > >> hands > >> > > the > >> > > >> >>>>>>>>>> responsibility of outputting elements to the > >> SourceReader > >> > > >> >>>>>>>> implementation? > >> > > >> >>>>>>>>>> Has this been done for backwards compatibility reasons > >> with > >> > > the > >> > > >> >>> old > >> > > >> >>>>>>>> source > >> > > >> >>>>>>>>>> interface? If not, then one could define a > Collection<E> > >> > > >> >>>>>>>> getNextRecords() > >> > > >> >>>>>>>>>> method which returns the currently retrieved records > and > >> > then > >> > > >> >> the > >> > > >> >>>>>>> caller > >> > > >> >>>>>>>>>> emits them outside of the SourceReader. That way the > >> > > interface > >> > > >> >>> would > >> > > >> >>>>>>> not > >> > > >> >>>>>>>>>> allow to implement an outputting loop where we never > >> hand > >> > > back > >> > > >> >>>>>> control > >> > > >> >>>>>>>> to > >> > > >> >>>>>>>>>> the caller. At the moment, this contract can be easily > >> > broken > >> > > >> >> and > >> > > >> >>> is > >> > > >> >>>>>>>> only > >> > > >> >>>>>>>>>> mentioned loosely in the JavaDocs. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> The primary reason we handover the SourceOutput to the > >> > > >> >>> SourceReader > >> > > >> >>>>>> is > >> > > >> >>>>>>>>>> because sometimes it is difficult for a SourceReader > to > >> > emit > >> > > >> one > >> > > >> >>>>>>> record > >> > > >> >>>>>>>> at > >> > > >> >>>>>>>>>> a time. One example is some batched messaging systems > >> which > >> > > >> only > >> > > >> >>>>>> have > >> > > >> >>>>>>> an > >> > > >> >>>>>>>>>> offset for the entire batch instead of individual > >> messages > >> > in > >> > > >> >> the > >> > > >> >>>>>>>> batch. In > >> > > >> >>>>>>>>>> that case, returning one record at a time would leave > >> the > >> > > >> >>>>>> SourceReader > >> > > >> >>>>>>>> in > >> > > >> >>>>>>>>>> an uncheckpointable state because they can only > >> checkpoint > >> > at > >> > > >> >> the > >> > > >> >>>>>>> batch > >> > > >> >>>>>>>>>> boundaries. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Thanks, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Jiangjie (Becket) Qin > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> On Tue, Dec 10, 2019 at 5:33 PM Till Rohrmann < > >> > > >> >>> trohrm...@apache.org > >> > > >> >>>>>>>> <mailto:trohrm...@apache.org>> <trohrm...@apache.org > >> > <mailto: > >> > > >> >>>>>>>> trohrm...@apache.org>> wrote: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Hi everyone, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> thanks for drafting this FLIP. It reads very well. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Concerning Dawid's proposal, I tend to agree. The > >> > boundedness > >> > > >> >>> could > >> > > >> >>>>>>> come > >> > > >> >>>>>>>>>> from the source and tell the system how to treat the > >> > operator > >> > > >> >>>>>>>> (scheduling > >> > > >> >>>>>>>>>> wise). From a user's perspective it should be fine to > >> get > >> > > back > >> > > >> a > >> > > >> >>>>>>>> DataStream > >> > > >> >>>>>>>>>> when calling env.source(boundedSource) if he does not > >> need > >> > > >> >> special > >> > > >> >>>>>>>>>> operations defined on a BoundedDataStream. If he needs > >> > this, > >> > > >> >> then > >> > > >> >>>>>> one > >> > > >> >>>>>>>> could > >> > > >> >>>>>>>>>> use the method BoundedDataStream > >> > > >> >> env.boundedSource(boundedSource). > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> If possible, we could enforce the proper usage of > >> > > >> >>>>>> env.boundedSource() > >> > > >> >>>>>>> by > >> > > >> >>>>>>>>>> introducing a BoundedSource type so that one cannot > >> pass an > >> > > >> >>>>>>>>>> unbounded source to it. That way users would not be > >> able to > >> > > >> >> shoot > >> > > >> >>>>>>>>>> themselves in the foot. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Maybe this has already been asked before but I was > >> > wondering > >> > > >> why > >> > > >> >>> the > >> > > >> >>>>>>>>>> SourceReader interface has the method pollNext which > >> hands > >> > > the > >> > > >> >>>>>>>>>> responsibility of outputting elements to the > >> SourceReader > >> > > >> >>>>>>>> implementation? > >> > > >> >>>>>>>>>> Has this been done for backwards compatibility reasons > >> with > >> > > the > >> > > >> >>> old > >> > > >> >>>>>>>> source > >> > > >> >>>>>>>>>> interface? If not, then one could define a > Collection<E> > >> > > >> >>>>>>>> getNextRecords() > >> > > >> >>>>>>>>>> method which returns the currently retrieved records > and > >> > then > >> > > >> >> the > >> > > >> >>>>>>> caller > >> > > >> >>>>>>>>>> emits them outside of the SourceReader. That way the > >> > > interface > >> > > >> >>> would > >> > > >> >>>>>>> not > >> > > >> >>>>>>>>>> allow to implement an outputting loop where we never > >> hand > >> > > back > >> > > >> >>>>>> control > >> > > >> >>>>>>>> to > >> > > >> >>>>>>>>>> the caller. At the moment, this contract can be easily > >> > broken > >> > > >> >> and > >> > > >> >>> is > >> > > >> >>>>>>>> only > >> > > >> >>>>>>>>>> mentioned loosely in the JavaDocs. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Cheers, > >> > > >> >>>>>>>>>> Till > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> On Tue, Dec 10, 2019 at 7:49 AM Jingsong Li < > >> > > >> >>> jingsongl...@gmail.com > >> > > >> >>>>>>>> <mailto:jingsongl...@gmail.com>> < > jingsongl...@gmail.com > >> > > >> <mailto: > >> > > >> >>>>>>>> jingsongl...@gmail.com>> > >> > > >> >>>>>>>>>> wrote: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Hi all, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> I think current design is good. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> My understanding is: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> For execution mode: bounded mode and continuous mode, > >> It's > >> > > >> >> totally > >> > > >> >>>>>>>>>> different. I don't think we have the ability to > >> integrate > >> > the > >> > > >> >> two > >> > > >> >>>>>>> models > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> at > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> present. It's about scheduling, memory, algorithms, > >> States, > >> > > >> etc. > >> > > >> >>> we > >> > > >> >>>>>>>>>> shouldn't confuse them. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> For source capabilities: only bounded, only > continuous, > >> > both > >> > > >> >>> bounded > >> > > >> >>>>>>> and > >> > > >> >>>>>>>>>> continuous. > >> > > >> >>>>>>>>>> I think Kafka is a source that can be ran both bounded > >> > > >> >>>>>>>>>> and continuous execution mode. > >> > > >> >>>>>>>>>> And Kafka with end offset should be ran both bounded > >> > > >> >>>>>>>>>> and continuous execution mode. Using apache Beam with > >> > Flink > >> > > >> >>>>>> runner, I > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> used > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> to run a "bounded" Kafka in streaming mode. For our > >> > previous > >> > > >> >>>>>>> DataStream, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> it > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> is not necessarily required that the source cannot be > >> > > bounded. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> So it is my thought for Dawid's question: > >> > > >> >>>>>>>>>> 1.pass a bounded source to continuousSource() +1 > >> > > >> >>>>>>>>>> 2.pass a continuous source to boundedSource() -1, > should > >> > > throw > >> > > >> >>>>>>>> exception. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> In StreamExecutionEnvironment, continuousSource and > >> > > >> >> boundedSource > >> > > >> >>>>>>> define > >> > > >> >>>>>>>>>> the execution mode. It defines a clear boundary of > >> > execution > >> > > >> >> mode. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Best, > >> > > >> >>>>>>>>>> Jingsong Lee > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> On Tue, Dec 10, 2019 at 10:37 AM Jark Wu < > >> imj...@gmail.com > >> > > >> >>> <mailto: > >> > > >> >>>>>>>> imj...@gmail.com>> <imj...@gmail.com <mailto: > >> > imj...@gmail.com > >> > > >> > >> > > >> >>>> wrote: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> I agree with Dawid's point that the boundedness > >> information > >> > > >> >> should > >> > > >> >>>>>>> come > >> > > >> >>>>>>>>>> from the source itself (e.g. the end timestamp), not > >> > through > >> > > >> >>>>>>>>>> env.boundedSouce()/continuousSource(). > >> > > >> >>>>>>>>>> I think if we want to support something like > >> `env.source()` > >> > > >> that > >> > > >> >>>>>>> derive > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> execution mode from source, > >> > > `supportsBoundedness(Boundedness)` > >> > > >> >>>>>>>>>> method is not enough, because we don't know whether it > >> is > >> > > >> >> bounded > >> > > >> >>> or > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> not. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Best, > >> > > >> >>>>>>>>>> Jark > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> On Mon, 9 Dec 2019 at 22:21, Dawid Wysakowicz < > >> > > >> >>>>>> dwysakow...@apache.org > >> > > >> >>>>>>>> <mailto:dwysakow...@apache.org>> < > dwysakow...@apache.org > >> > > >> <mailto: > >> > > >> >>>>>>>> dwysakow...@apache.org>> > >> > > >> >>>>>>>>>> wrote: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> One more thing. In the current proposal, with the > >> > > >> >>>>>>>>>> supportsBoundedness(Boundedness) method and the > >> boundedness > >> > > >> >> coming > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> from > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> either continuousSource or boundedSource I could not > >> find > >> > how > >> > > >> >> this > >> > > >> >>>>>>>>>> information is fed back to the SplitEnumerator. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Best, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Dawid > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> On 09/12/2019 13:52, Becket Qin wrote: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Hi Dawid, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Thanks for the comments. This actually brings another > >> > > relevant > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> question > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> about what does a "bounded source" imply. I actually > had > >> > the > >> > > >> >> same > >> > > >> >>>>>>>>>> impression when I look at the Source API. Here is > what I > >> > > >> >>> understand > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> after > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> some discussion with Stephan. The bounded source has > the > >> > > >> >> following > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> impacts. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> 1. API validity. > >> > > >> >>>>>>>>>> - A bounded source generates a bounded stream so some > >> > > >> operations > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> that > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> only > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> works for bounded records would be performed, e.g. > sort. > >> > > >> >>>>>>>>>> - To expose these bounded stream only APIs, there are > >> two > >> > > >> >> options: > >> > > >> >>>>>>>>>> a. Add them to the DataStream API and throw > >> exception > >> > > if > >> > > >> a > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> method > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> is > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> called on an unbounded stream. > >> > > >> >>>>>>>>>> b. Create a BoundedDataStream class which is > >> returned > >> > > >> from > >> > > >> >>>>>>>>>> env.boundedSource(), while DataStream is returned from > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> env.continousSource(). > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Note that this cannot be done by having single > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> env.source(theSource) > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> even > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the Source has a getBoundedness() method. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> 2. Scheduling > >> > > >> >>>>>>>>>> - A bounded source could be computed stage by stage > >> without > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> bringing > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> up > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> all > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the tasks at the same time. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> 3. Operator behaviors > >> > > >> >>>>>>>>>> - A bounded source indicates the records are finite so > >> some > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> operators > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> can > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> wait until it receives all the records before it > starts > >> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> processing. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> In the above impact, only 1 is relevant to the API > >> design. > >> > > And > >> > > >> >> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> current > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> proposal in FLIP-27 is following 1.b. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> // boundedness depends of source property, imo this > >> should > >> > > >> >> always > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> be > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> preferred > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> DataStream<MyType> stream = env.source(theSource); > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> In your proposal, does DataStream have bounded stream > >> only > >> > > >> >>> methods? > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> It > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> looks it should have, otherwise passing a bounded > >> Source to > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> env.source() > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> would be confusing. In that case, we will essentially > do > >> > 1.a > >> > > if > >> > > >> >> an > >> > > >> >>>>>>>>>> unbounded Source is created from > >> > env.source(unboundedSource). > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> If we have the methods only supported for bounded > >> streams > >> > in > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> DataStream, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> it > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> seems a little weird to have a separate > >> BoundedDataStream > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> interface. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Am I understand it correctly? > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Thanks, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Jiangjie (Becket) Qin > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> On Mon, Dec 9, 2019 at 6:40 PM Dawid Wysakowicz < > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> dwysakow...@apache.org <mailto:dwysakow...@apache.org > >> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> wrote: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Hi all, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Really well written proposal and very important one. I > >> must > >> > > >> >> admit > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> I > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> have > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> not understood all the intricacies of it yet. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> One question I have though is about where does the > >> > > information > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> about > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> boundedness come from. I think in most cases it is a > >> > property > >> > > >> of > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> source. As you described it might be e.g. end offset, > a > >> > flag > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> should > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> it > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> monitor new splits etc. I think it would be a really > >> nice > >> > use > >> > > >> >> case > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> to > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> be > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> able to say: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> new KafkaSource().readUntil(long timestamp), > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> which could work as an "end offset". Moreover I think > >> all > >> > > >> >> Bounded > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> sources > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> support continuous mode, but no intrinsically > continuous > >> > > source > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> support > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Bounded mode. If I understood the proposal correctly > it > >> > > suggest > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> boundedness sort of "comes" from the outside of the > >> source, > >> > > >> from > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> invokation of either boundedStream or continousSource. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> I am wondering if it would make sense to actually > change > >> > the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> method > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> boolean Source#supportsBoundedness(Boundedness) > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> to > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Boundedness Source#getBoundedness(). > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> As for the methods #boundedSource, #continousSource, > >> > assuming > >> > > >> >> the > >> > > >> >>>>>>>>>> boundedness is property of the source they do not > affect > >> > how > >> > > >> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> enumerator > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> works, but mostly how the dag is scheduled, right? I > am > >> not > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> against > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> those > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> methods, but I think it is a very specific use case to > >> > > actually > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> override > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the property of the source. In general I would expect > >> users > >> > > to > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> only > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> call > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> env.source(theSource), where the source tells if it is > >> > > bounded > >> > > >> >> or > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> not. I > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> would suggest considering following set of methods: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> // boundedness depends of source property, imo this > >> should > >> > > >> >> always > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> be > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> preferred > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> DataStream<MyType> stream = env.source(theSource); > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> // always continous execution, whether bounded or > >> unbounded > >> > > >> >> source > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> DataStream<MyType> boundedStream = > >> > > >> >> env.continousSource(theSource); > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> // imo this would make sense if the BoundedDataStream > >> > > provides > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> additional features unavailable for continous mode > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> BoundedDataStream<MyType> batch = > >> > > env.boundedSource(theSource); > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Best, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Dawid > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> On 04/12/2019 11:25, Stephan Ewen wrote: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Thanks, Becket, for updating this. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> I agree with moving the aspects you mentioned into > >> separate > >> > > >> >> FLIPs > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> this > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> one way becoming unwieldy in size. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> +1 to the FLIP in its current state. Its a very > detailed > >> > > >> >> write-up, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> nicely > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> done! > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> On Wed, Dec 4, 2019 at 7:38 AM Becket Qin < > >> > > >> becket....@gmail.com > >> > > >> >>>>>>>> <mailto:becket....@gmail.com>> <becket....@gmail.com > >> > <mailto: > >> > > >> >>>>>>>> becket....@gmail.com>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> < > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> becket....@gmail.com <mailto:becket....@gmail.com>> > >> wrote: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Hi all, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Sorry for the long belated update. I have updated > >> FLIP-27 > >> > > wiki > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> page > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> with > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the latest proposals. Some noticeable changes include: > >> > > >> >>>>>>>>>> 1. A new generic communication mechanism between > >> > > >> SplitEnumerator > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> and > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> SourceReader. > >> > > >> >>>>>>>>>> 2. Some detail API method signature changes. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> We left a few things out of this FLIP and will address > >> them > >> > > in > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> separate > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> FLIPs. Including: > >> > > >> >>>>>>>>>> 1. Per split event time. > >> > > >> >>>>>>>>>> 2. Event time alignment. > >> > > >> >>>>>>>>>> 3. Fine grained failover for SplitEnumerator failure. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Please let us know if you have any question. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Thanks, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Jiangjie (Becket) Qin > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> On Sat, Nov 16, 2019 at 6:10 AM Stephan Ewen < > >> > > se...@apache.org > >> > > >> >>>>>>> <mailto: > >> > > >> >>>>>>>> se...@apache.org>> <se...@apache.org <mailto: > >> > se...@apache.org > >> > > >> > >> > > >> < > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> se...@apache.org <mailto:se...@apache.org>> wrote: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Hi Łukasz! > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Becket and me are working hard on figuring out the > last > >> > > details > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> and > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> implementing the first PoC. We would update the FLIP > >> > > hopefully > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> next > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> week. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> There is a fair chance that a first version of this > >> will be > >> > > in > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> 1.10, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> but > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> I > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> think it will take another release to battle test it > and > >> > > >> migrate > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> connectors. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Best, > >> > > >> >>>>>>>>>> Stephan > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> On Fri, Nov 15, 2019 at 11:14 AM Łukasz Jędrzejewski < > >> > > >> >> l...@touk.pl > >> > > >> >>>>>>>> <mailto:l...@touk.pl> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> < > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> l...@touk.pl <mailto:l...@touk.pl>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> wrote: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Hi, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> This proposal looks very promising for us. Do you have > >> any > >> > > >> plans > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> in > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> which > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Flink release it is going to be released? We are > >> thinking > >> > on > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> using a > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Data > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Set API for our future use cases but on the other hand > >> Data > >> > > Set > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> API > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> is > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> going to be deprecated so using proposed bounded data > >> > streams > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> solution > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> could be more viable in the long term. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Thanks, > >> > > >> >>>>>>>>>> Łukasz > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> On 2019/10/01 15:48:03, Thomas Weise < > >> > thomas.we...@gmail.com > >> > > >> >>>>>> <mailto: > >> > > >> >>>>>>>> thomas.we...@gmail.com>> <thomas.we...@gmail.com > <mailto: > >> > > >> >>>>>>>> thomas.we...@gmail.com>> < > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> thomas.we...@gmail.com <mailto:thomas.we...@gmail.com > >> > >> > > wrote: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Thanks for putting together this proposal! > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> I see that the "Per Split Event Time" and "Event Time > >> > > >> Alignment" > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> sections > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> are still TBD. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> It would probably be good to flesh those out a bit > >> before > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> proceeding > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> too > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> far > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> as the event time alignment will probably influence > the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> interaction > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> with > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the split reader, specifically ReaderStatus > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> emitNext(SourceOutput<E> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> output). > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> We currently have only one implementation for event > time > >> > > >> >> alignment > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> in > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Kinesis consumer. The synchronization in that case > takes > >> > > place > >> > > >> >> as > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> last > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> step before records are emitted downstream > >> (RecordEmitter). > >> > > >> With > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> currently proposed interfaces, the equivalent can be > >> > > >> implemented > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> in > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> reader loop, although note that in the Kinesis > consumer > >> the > >> > > per > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> shard > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> threads push records. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Synchronization has not been implemented for the Kafka > >> > > consumer > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> yet. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-12675 < > >> > > >> >>>>>>>> https://issues.apache.org/jira/browse/FLINK-12675> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> When I looked at it, I realized that the > implementation > >> > will > >> > > >> >> look > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> quite > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> different > >> > > >> >>>>>>>>>> from Kinesis because it needs to take place in the > pull > >> > part, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> where > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> records > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> are taken from the Kafka client. Due to the > >> multiplexing it > >> > > >> >> cannot > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> be > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> done > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> by blocking the split thread like it currently works > for > >> > > >> >> Kinesis. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Reading > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> from individual Kafka partitions needs to be > controlled > >> via > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> pause/resume > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> on the Kafka client. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> To take on that responsibility the split thread would > >> need > >> > to > >> > > >> be > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> aware > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> of > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> watermarks or at least whether it should or should not > >> > > continue > >> > > >> >> to > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> consume > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> a given split and this may require a different > >> SourceReader > >> > > or > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> SourceOutput > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> interface. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Thanks, > >> > > >> >>>>>>>>>> Thomas > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> On Fri, Jul 26, 2019 at 1:39 AM Biao Liu < > >> > mmyy1...@gmail.com > >> > > >> >>>>>> <mailto: > >> > > >> >>>>>>>> mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto: > >> > > >> >> mmyy1...@gmail.com > >> > > >> >>>>> > >> > > >> >>>>>> < > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> > wrote: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Hi Stephan, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Thank you for feedback! > >> > > >> >>>>>>>>>> Will take a look at your branch before public > >> discussing. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> On Fri, Jul 26, 2019 at 12:01 AM Stephan Ewen < > >> > > >> se...@apache.org > >> > > >> >>>>>>>> <mailto:se...@apache.org>> <se...@apache.org <mailto: > >> > > >> >>> se...@apache.org > >> > > >> >>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> < > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> se...@apache.org <mailto:se...@apache.org>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> wrote: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Hi Biao! > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Thanks for reviving this. I would like to join this > >> > > discussion, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> but > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> am > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> quite occupied with the 1.9 release, so can we maybe > >> pause > >> > > this > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> discussion > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> for a week or so? > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> In the meantime I can share some suggestion based on > >> prior > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> experiments: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> How to do watermarks / timestamp extractors in a > simpler > >> > and > >> > > >> >> more > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> flexible > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> way. I think that part is quite promising should be > >> part of > >> > > the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> new > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> source > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> interface. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>> > >> > > >> >>>>>>> > >> > > >> >>>>>> > >> > > >> >>>> > >> > > >> >>> > >> > > >> >> > >> > > >> > >> > > > >> > > >> > https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime > >> > > >> >>>>>>>> < > >> > > >> >>>>>>>> > >> > > >> >>>>>>> > >> > > >> >>>>>> > >> > > >> >>>> > >> > > >> >>> > >> > > >> >> > >> > > >> > >> > > > >> > > >> > https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime > >> > > >> >>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>> > >> > > >> >>>>>>> > >> > > >> >>>>>> > >> > > >> >>>> > >> > > >> >>> > >> > > >> >> > >> > > >> > >> > > > >> > > >> > https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java > >> > > >> >>>>>>>> < > >> > > >> >>>>>>>> > >> > > >> >>>>>>> > >> > > >> >>>>>> > >> > > >> >>>> > >> > > >> >>> > >> > > >> >> > >> > > >> > >> > > > >> > > >> > https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java > >> > > >> >>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Some experiments on how to build the source reader and > >> its > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> library > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> for > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> common threading/split patterns: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>> > >> > > >> >>>>>>> > >> > > >> >>>>>> > >> > > >> >>>> > >> > > >> >>> > >> > > >> >> > >> > > >> > >> > > > >> > > >> > https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src > >> > > >> >>>>>>>> < > >> > > >> >>>>>>>> > >> > > >> >>>>>>> > >> > > >> >>>>>> > >> > > >> >>>> > >> > > >> >>> > >> > > >> >> > >> > > >> > >> > > > >> > > >> > https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src > >> > > >> >>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Best, > >> > > >> >>>>>>>>>> Stephan > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> On Thu, Jul 25, 2019 at 10:03 AM Biao Liu < > >> > > mmyy1...@gmail.com > >> > > >> >>>>>>> <mailto: > >> > > >> >>>>>>>> mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto: > >> > > >> >> mmyy1...@gmail.com > >> > > >> >>>>> > >> > > >> >>>>>> < > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> wrote: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Hi devs, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Since 1.9 is nearly released, I think we could get > back > >> to > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> FLIP-27. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> I > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> believe it should be included in 1.10. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> There are so many things mentioned in document of > >> FLIP-27. > >> > > [1] > >> > > >> I > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> think > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> we'd better discuss them separately. However the wiki > is > >> > not > >> > > a > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> good > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> place > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> to discuss. I wrote google doc about SplitReader API > >> which > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> misses > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> some > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> details in the document. [2] > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> 1. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>> > >> > > >> >>>>>>> > >> > > >> >>>>>> > >> > > >> >>>> > >> > > >> >>> > >> > > >> >> > >> > > >> > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface > >> > > >> >>>>>>>> < > >> > > >> >>>>>>>> > >> > > >> >>>>>>> > >> > > >> >>>>>> > >> > > >> >>>> > >> > > >> >>> > >> > > >> >> > >> > > >> > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface > >> > > >> >>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> 2. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>> > >> > > >> >>>>>>> > >> > > >> >>>>>> > >> > > >> >>>> > >> > > >> >>> > >> > > >> >> > >> > > >> > >> > > > >> > > >> > https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing > >> > > >> >>>>>>>> < > >> > > >> >>>>>>>> > >> > > >> >>>>>>> > >> > > >> >>>>>> > >> > > >> >>>> > >> > > >> >>> > >> > > >> >> > >> > > >> > >> > > > >> > > >> > https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing > >> > > >> >>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> CC Stephan, Aljoscha, Piotrek, Becket > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> On Thu, Mar 28, 2019 at 4:38 PM Biao Liu < > >> > mmyy1...@gmail.com > >> > > >> >>>>>> <mailto: > >> > > >> >>>>>>>> mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto: > >> > > >> >> mmyy1...@gmail.com > >> > > >> >>>>> > >> > > >> >>>>>> < > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> wrote: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Hi Steven, > >> > > >> >>>>>>>>>> Thank you for the feedback. Please take a look at the > >> > > document > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> FLIP-27 > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> < > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>> > >> > > >> >>>>>>> > >> > > >> >>>>>> > >> > > >> >>>> > >> > > >> >>> > >> > > >> >> > >> > > >> > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > >> > > >> >>>>>>>> < > >> > > >> >>>>>>>> > >> > > >> >>>>>>> > >> > > >> >>>>>> > >> > > >> >>>> > >> > > >> >>> > >> > > >> >> > >> > > >> > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > >> > > >> >>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> which > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> is updated recently. A lot of details of enumerator > were > >> > > added > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> in > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> this > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> document. I think it would help. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Steven Wu <stevenz...@gmail.com <mailto: > >> > stevenz...@gmail.com > >> > > >> > >> > > >> >> < > >> > > >> >>>>>>>> stevenz...@gmail.com <mailto:stevenz...@gmail.com>> < > >> > > >> >>>>>>> stevenz...@gmail.com > >> > > >> >>>>>>>> <mailto:stevenz...@gmail.com>> <stevenz...@gmail.com > >> > <mailto: > >> > > >> >>>>>>>> stevenz...@gmail.com>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> 于2019年3月28日周四 > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> 下午12:52写道: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> This proposal mentioned that SplitEnumerator might run > >> on > >> > the > >> > > >> >>>>>>>>>> JobManager or > >> > > >> >>>>>>>>>> in a single task on a TaskManager. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> if enumerator is a single task on a taskmanager, then > >> the > >> > job > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> DAG > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> can > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> never > >> > > >> >>>>>>>>>> been embarrassingly parallel anymore. That will > nullify > >> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> leverage > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> of > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> fine-grained recovery for embarrassingly parallel > jobs. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> It's not clear to me what's the implication of running > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> enumerator > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> on > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> jobmanager. So I will leave that out for now. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> On Mon, Jan 28, 2019 at 3:05 AM Biao Liu < > >> > mmyy1...@gmail.com > >> > > >> >>>>>> <mailto: > >> > > >> >>>>>>>> mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto: > >> > > >> >> mmyy1...@gmail.com > >> > > >> >>>>> > >> > > >> >>>>>> < > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> wrote: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Hi Stephan & Piotrek, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Thank you for feedback. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> It seems that there are a lot of things to do in > >> community. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> I > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> am > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> just > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> afraid that this discussion may be forgotten since > >> there so > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> many > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> proposals > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> recently. > >> > > >> >>>>>>>>>> Anyway, wish to see the split topics soon :) > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Piotr Nowojski <pi...@da-platform.com <mailto: > >> > > >> >>> pi...@da-platform.com > >> > > >> >>>>>>>> > >> > > >> >>>>>>> < > >> > > >> >>>>>>>> pi...@da-platform.com <mailto:pi...@da-platform.com>> < > >> > > >> >>>>>>>> pi...@da-platform.com <mailto:pi...@da-platform.com>> < > >> > > >> >>>>>>>> pi...@da-platform.com <mailto:pi...@da-platform.com>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> 于2019年1月24日周四 > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> 下午8:21写道: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Hi Biao! > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> This discussion was stalled because of preparations > for > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> open > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> sourcing > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> & merging Blink. I think before creating the tickets > we > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> should > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> split this > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> discussion into topics/areas outlined by Stephan and > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> create > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Flips > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> for > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> that. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> I think there is no chance for this to be completed in > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> couple > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> of > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> remaining > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> weeks/1 month before 1.8 feature freeze, however it > >> would > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> be > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> good > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> to aim > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> with those changes for 1.9. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Piotrek > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> On 20 Jan 2019, at 16:08, Biao Liu < > mmyy1...@gmail.com > >> > > >> <mailto: > >> > > >> >>>>>>>> mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto: > >> > > >> >> mmyy1...@gmail.com > >> > > >> >>>>> > >> > > >> >>>>>> < > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> wrote: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Hi community, > >> > > >> >>>>>>>>>> The summary of Stephan makes a lot sense to me. It is > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> much > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> clearer > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> indeed > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> after splitting the complex topic into small ones. > >> > > >> >>>>>>>>>> I was wondering is there any detail plan for next > step? > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> If > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> not, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> I > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> would > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> like to push this thing forward by creating some JIRA > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> issues. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Another question is that should version 1.8 include > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> these > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> features? > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Stephan Ewen <se...@apache.org <mailto: > se...@apache.org > >> >> > >> > < > >> > > >> >>>>>>>> se...@apache.org <mailto:se...@apache.org>> < > >> > se...@apache.org > >> > > >> >>>> <mailto: > >> > > >> >>>>>>>> se...@apache.org>> <se...@apache.org <mailto: > >> > se...@apache.org > >> > > >> > >> > > >> >>>>>>>> 于2018年12月1日周六 > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> 上午4:20写道: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Thanks everyone for the lively discussion. Let me try > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> to > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> summarize > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> where I > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> see convergence in the discussion and open issues. > >> > > >> >>>>>>>>>> I'll try to group this by design aspect of the source. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Please > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> let me > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> know > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> if I got things wrong or missed something crucial > here. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> For issues 1-3, if the below reflects the state of the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> discussion, I > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> would > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> try and update the FLIP in the next days. > >> > > >> >>>>>>>>>> For the remaining ones we need more discussion. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> I would suggest to fork each of these aspects into a > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> separate > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> mail > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> thread, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> or will loose sight of the individual aspects. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> *(1) Separation of Split Enumerator and Split Reader* > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - All seem to agree this is a good thing > >> > > >> >>>>>>>>>> - Split Enumerator could in the end live on JobManager > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> (and > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> assign > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> splits > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> via RPC) or in a task (and assign splits via data > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> streams) > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - this discussion is orthogonal and should come later, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> when > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> interface > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> is agreed upon. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> *(2) Split Readers for one or more splits* > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - Discussion seems to agree that we need to support > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> one > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> reader > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> that > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> possibly handles multiple splits concurrently. > >> > > >> >>>>>>>>>> - The requirement comes from sources where one > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> poll()-style > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> call > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> fetches > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> data from different splits / partitions > >> > > >> >>>>>>>>>> --> example sources that require that would be for > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> example > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Kafka, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Pravega, Pulsar > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - Could have one split reader per source, or multiple > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> split > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> readers > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> that > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> share the "poll()" function > >> > > >> >>>>>>>>>> - To not make it too complicated, we can start with > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> thinking > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> about > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> one > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> split reader for all splits initially and see if that > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> covers > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> all > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> requirements > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> *(3) Threading model of the Split Reader* > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - Most active part of the discussion ;-) > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - A non-blocking way for Flink's task code to interact > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> with > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> source > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> is > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> needed in order to a task runtime code based on a > >> > > >> >>>>>>>>>> single-threaded/actor-style task design > >> > > >> >>>>>>>>>> --> I personally am a big proponent of that, it > will > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> help > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> with > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> well-behaved checkpoints, efficiency, and simpler yet > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> more > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> robust > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> runtime > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> code > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - Users care about simple abstraction, so as a > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> subclass > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> of > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> SplitReader > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> (non-blocking / async) we need to have a > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> BlockingSplitReader > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> which > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> will > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> form the basis of most source implementations. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> BlockingSplitReader > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> lets > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> users do blocking simple poll() calls. > >> > > >> >>>>>>>>>> - The BlockingSplitReader would spawn a thread (or > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> more) > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> and > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> thread(s) can make blocking calls and hand over data > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> buffers > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> via > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> a > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> blocking > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> queue > >> > > >> >>>>>>>>>> - This should allow us to cover both, a fully async > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> runtime, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> and a > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> simple > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> blocking interface for users. > >> > > >> >>>>>>>>>> - This is actually very similar to how the Kafka > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> connectors > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> work. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Kafka > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> 9+ with one thread, Kafka 8 with multiple threads > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - On the base SplitReader (the async one), the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> non-blocking > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> method > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> that > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> gets the next chunk of data would signal data > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> availability > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> via > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> a > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> CompletableFuture, because that gives the best > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> flexibility > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> (can > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> await > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> completion or register notification handlers). > >> > > >> >>>>>>>>>> - The source task would register a "thenHandle()" (or > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> similar) > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> on the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> future to put a "take next data" task into the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> actor-style > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> mailbox > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> *(4) Split Enumeration and Assignment* > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - Splits may be generated lazily, both in cases where > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> there > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> is a > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> limited > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> number of splits (but very many), or splits are > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> discovered > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> over > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> time > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - Assignment should also be lazy, to get better load > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> balancing > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - Assignment needs support locality preferences > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - Possible design based on discussion so far: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> --> SplitReader has a method > "addSplits(SplitT...)" > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> to > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> add > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> one or > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> more > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> splits. Some split readers might assume they have only > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> one > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> split > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> ever, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> concurrently, others assume multiple splits. (Note: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> idea > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> behind > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> being > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> able > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> to add multiple splits at the same time is to ease > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> startup > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> where > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> multiple > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> splits may be assigned instantly.) > >> > > >> >>>>>>>>>> --> SplitReader has a context object on which it > can > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> call > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> indicate > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> when > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> splits are completed. The enumerator gets that > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> notification and > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> can > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> use > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> to > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> decide when to assign new splits. This should help > both > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> in > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> cases > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> of > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> sources > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> that take splits lazily (file readers) and in case the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> source > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> needs to > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> preserve a partial order between splits (Kinesis, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Pravega, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Pulsar may > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> need > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> that). > >> > > >> >>>>>>>>>> --> SplitEnumerator gets notification when > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> SplitReaders > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> start > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> and > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> when > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> they finish splits. They can decide at that moment to > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> push > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> more > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> splits > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> to > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> that reader > >> > > >> >>>>>>>>>> --> The SplitEnumerator should probably be aware > of > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> source > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> parallelism, to build its initial distribution. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - Open question: Should the source expose something > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> like > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> "host > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> preferences", so that yarn/mesos/k8s can take this > into > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> account > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> when > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> selecting a node to start a TM on? > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> *(5) Watermarks and event time alignment* > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - Watermark generation, as well as idleness, needs to > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> be > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> per > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> split > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> (like > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> currently in the Kafka Source, per partition) > >> > > >> >>>>>>>>>> - It is desirable to support optional > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> event-time-alignment, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> meaning > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> that > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> splits that are ahead are back-pressured or > temporarily > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> unsubscribed > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - I think i would be desirable to encapsulate > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> watermark > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> generation > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> logic > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> in watermark generators, for a separation of concerns. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> The > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> watermark > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> generators should run per split. > >> > > >> >>>>>>>>>> - Using watermark generators would also help with > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> another > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> problem of > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> suggested interface, namely supporting non-periodic > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> watermarks > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> efficiently. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - Need a way to "dispatch" next record to different > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> watermark > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> generators > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - Need a way to tell SplitReader to "suspend" a split > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> until a > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> certain > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> watermark is reached (event time backpressure) > >> > > >> >>>>>>>>>> - This would in fact be not needed (and thus simpler) > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> if > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> we > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> had > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> a > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> SplitReader per split and may be a reason to re-open > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> that > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> discussion > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> *(6) Watermarks across splits and in the Split > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Enumerator* > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - The split enumerator may need some watermark > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> awareness, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> which > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> should > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> be > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> purely based on split metadata (like create timestamp > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> of > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> file > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> splits) > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - If there are still more splits with overlapping > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> event > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> time > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> range > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> for > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> a > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> split reader, then that split reader should not > advance > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> watermark > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> within the split beyond the overlap boundary. > Otherwise > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> future > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> splits > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> will > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> produce late data. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - One way to approach this could be that the split > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> enumerator > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> may > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> send > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> watermarks to the readers, and the readers cannot emit > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> watermarks > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> beyond > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> that received watermark. > >> > > >> >>>>>>>>>> - Many split enumerators would simply immediately send > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Long.MAX > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> out > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> and > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> leave the progress purely to the split readers. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - For event-time alignment / split back pressure, this > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> begs > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> question > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> how we can avoid deadlocks that may arise when splits > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> are > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> suspended > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> for > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> event time back pressure, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> *(7) Batch and streaming Unification* > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - Functionality wise, the above design should support > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> both > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - Batch often (mostly) does not care about reading "in > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> order" > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> and > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> generating watermarks > >> > > >> >>>>>>>>>> --> Might use different enumerator logic that is > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> more > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> locality > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> aware > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> and ignores event time order > >> > > >> >>>>>>>>>> --> Does not generate watermarks > >> > > >> >>>>>>>>>> - Would be great if bounded sources could be > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> identified > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> at > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> compile > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> time, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> so that "env.addBoundedSource(...)" is type safe and > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> can > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> return a > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> "BoundedDataStream". > >> > > >> >>>>>>>>>> - Possible to defer this discussion until later > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> *Miscellaneous Comments* > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - Should the source have a TypeInformation for the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> produced > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> type, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> instead > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> of a serializer? We need a type information in the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> stream > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> anyways, and > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> can > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> derive the serializer from that. Plus, creating the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> serializer > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> should > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> respect the ExecutionConfig. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> - The TypeSerializer interface is very powerful but > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> also > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> not > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> easy to > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> implement. Its purpose is to handle data super > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> efficiently, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> support > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> flexible ways of evolution, etc. > >> > > >> >>>>>>>>>> For metadata I would suggest to look at the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> SimpleVersionedSerializer > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> instead, which is used for example for checkpoint > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> master > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> hooks, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> or for > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> streaming file sink. I think that is is a good match > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> for > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> cases > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> where > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> we > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> do > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> not need more than ser/deser (no copy, etc.) and don't > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> need to > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> push > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> versioning out of the serialization paths for best > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> performance > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> (as in > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> TypeSerializer) > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas < > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> k.klou...@data-artisans.com> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> wrote: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Hi Biao, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Thanks for the answer! > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> So given the multi-threaded readers, now we have as > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> open > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> questions: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> 1) How do we let the checkpoints pass through our > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> multi-threaded > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> reader > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> operator? > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> 2) Do we have separate reader and source operators or > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> not? In > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> strategy > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> that has a separate source, the source operator has a > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> parallelism of > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> 1 > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> and > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> is responsible for split recovery only. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> For the first one, given also the constraints > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> (blocking, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> finite > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> queues, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> etc), I do not have an answer yet. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> For the 2nd, I think that we should go with separate > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> operators > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> for > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> source and the readers, for the following reasons: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> 1) This is more aligned with a potential future > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> improvement > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> where the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> split > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> discovery becomes a responsibility of the JobManager > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> and > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> readers are > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> pooling more work from the JM. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> 2) The source is going to be the "single point of > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> truth". > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> It > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> will > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> know > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> what > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> has been processed and what not. If the source and the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> readers > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> are a > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> single > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> operator with parallelism > 1, or in general, if the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> split > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> discovery > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> is > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> done by each task individually, then: > >> > > >> >>>>>>>>>> i) we have to have a deterministic scheme for each > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> reader to > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> assign > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> splits to itself (e.g. mod subtaskId). This is not > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> necessarily > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> trivial > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> for > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> all sources. > >> > > >> >>>>>>>>>> ii) each reader would have to keep a copy of all > its > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> processed > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> slpits > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> iii) the state has to be a union state with a > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> non-trivial > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> merging > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> logic > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> in order to support rescaling. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Two additional points that you raised above: > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> i) The point that you raised that we need to keep all > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> splits > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> (processed > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> and > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> not-processed) I think is a bit of a strong > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> requirement. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> This > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> would > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> imply > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> that for infinite sources the state will grow > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> indefinitely. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> This is > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> problem > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> is even more pronounced if we do not have a single > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> source > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> that > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> assigns > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> splits to readers, as each reader will have its own > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> copy > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> of > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> state. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> ii) it is true that for finite sources we need to > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> somehow > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> not > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> close > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> readers when the source/split discoverer finishes. The > >> > > >> >>>>>>>>>> ContinuousFileReaderOperator has a work-around for > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> that. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> It is > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> not > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> elegant, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> and checkpoints are not emitted after closing the > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> source, > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> but > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> this, I > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> believe, is a bigger problem which requires more > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> changes > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> than > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> just > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> refactoring the source interface. > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> Cheers, > >> > > >> >>>>>>>>>> Kostas > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> > >> > > >> >>>>>>>>>> -- > >> > > >> >>>>>>>>>> Best, Jingsong Lee > >> > > >> >>>>>>>> > >> > > >> >>>>>>>> > >> > > >> >>>>>>> > >> > > >> >>>>>> > >> > > >> >>>>>> > >> > > >> >>>>>> -- > >> > > >> >>>>>> Best, Jingsong Lee > >> > > >> >>>>>> > >> > > >> >>>>> > >> > > >> >>>> > >> > > >> >>>> > >> > > >> >>> > >> > > >> >> > >> > > >> > > >> > > >> > >> > > >> > >> > > > >> > > >> > > >