A quick update: - Since there is currently another initiative ongoing for building rate limiting that potentially covers a wider range of use cases, it was decided not to expose the RateLimiter API publicly in this FLIP. It now has a package private visibility and can later be swapped with a more universal interface - The sourceRatePerSecond parameter type was changed from long to double to allow generating less than one event per second
Best, Alexander Fedulov On Mon, Jul 18, 2022 at 5:23 PM Alexander Fedulov <alexan...@ververica.com> wrote: > Hi all, > > I updated the FLIP [1] to make it more extensible with the introduction of > *SourceReaderFactory. > *It gives users the ability to further customize the data generation and > emission process if needed. I also incorporated the suggestion from > Qingsheng and moved to the generator function design with an initializer > method to support more sophisticated functions with non-serializable > fields. I am personally pretty happy with the current prototype [2], [3]. > Let me know if you have any other feedback, otherwise, I am going to start > the vote. > > [1] https://cwiki.apache.org/confluence/x/9Av1D > [2] > https://github.com/afedulov/flink/blob/e5f8e555543a67b9983c42b5db2a7617361fa459/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceV4.java#L52 > [3] > https://github.com/afedulov/flink/blob/e5f8e555543a67b9983c42b5db2a7617361fa459/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/GeneratorSourcePOC.java#L92 > > Best, > Alexander Fedulov > > > > > On Thu, Jul 7, 2022 at 12:08 AM Alexander Fedulov <alexan...@ververica.com> > wrote: > >> Hi Becket, >> >> interesting points about the discrepancies in the *RuntimeContext* >> "wrapping" throughout the framework, but I agree - this is something that >> needs to be tackled separately. >> For now, I adjusted the FLIP and the PoC implementation to only expose >> the parallelism. >> >> Best, >> Alexander Fedulov >> >> On Wed, Jul 6, 2022 at 2:42 AM Becket Qin <becket....@gmail.com> wrote: >> >>> Hi Alex, >>> >>> Personally I prefer the latter option, i.e. just add the >>> currentParallelism() method. It is easy to add more stuff to the >>> SourceReaderContext in the future, and it is likely that most of the >>> stuff >>> in the RuntimeContext is not required by the SourceReader >>> implementations. >>> For the purpose of this FLIP, adding the method is probably good enough. >>> >>> That said, I don't see a consistent pattern adopted in the project to >>> handle similar cases. The FunctionContext wraps the RuntimeContext and >>> only >>> exposes necessary stuff. CEPRuntimeContext extends the RuntimeContext and >>> overrides some methods that it does not want to expose with exception >>> throwing logic. Some internal context classes simply expose the entire >>> RuntimeContext with some additional methods. If we want to make things >>> clean, I'd imagine all these variations of context can become some >>> specific >>> combination of a ReadOnlyRuntimeContext and some "write" methods. But >>> this >>> may require a closer look at all these cases to make sure the >>> ReadOnlyRuntimeContext is generally suitable. I feel that it will take >>> some >>> time and could be a bigger discussion than the data generator source >>> itself. So maybe we can just go with adding a method at the moment. And >>> evolving the SourceReaderContext to use the ReadOnlyRuntimeContext in the >>> future. >>> >>> Thanks, >>> >>> Jiangjie (Becket) Qin >>> >>> On Tue, Jul 5, 2022 at 8:31 PM Alexander Fedulov < >>> alexan...@ververica.com> >>> wrote: >>> >>> > Hi Becket, >>> > >>> > I agree with you. We could introduce a *ReadOnlyRuntimeContext* that >>> would >>> > act as a holder for the *RuntimeContext* data. This would also require >>> > read-only wrappers for the exposed fields, such as *ExecutionConfig*. >>> > Alternatively, we just add the *currentParallelism()* method for now >>> and >>> > see if anything else might actually be needed later on. What do you >>> think? >>> > >>> > Best, >>> > Alexander Fedulov >>> > >>> > On Tue, Jul 5, 2022 at 2:30 AM Becket Qin <becket....@gmail.com> >>> wrote: >>> > >>> > > Hi Alex, >>> > > >>> > > While it is true that the RuntimeContext gives access to all the >>> stuff >>> > the >>> > > framework can provide, it seems a little overkilling for the >>> > SourceReader. >>> > > It is probably OK to expose all the read-only information in the >>> > > RuntimeContext to the SourceReader, but we may want to hide the >>> "write" >>> > > methods, such as creating states, writing stuff to distributed cache, >>> > etc, >>> > > because these methods may not work well with the SourceReader design >>> and >>> > > cause confusion. For example, users may wonder why the >>> snapshotState() >>> > > method exists while they can use the state directly. >>> > > >>> > > Thanks, >>> > > >>> > > Jiangjie (Becket) Qin >>> > > >>> > > >>> > > >>> > > On Tue, Jul 5, 2022 at 7:37 AM Alexander Fedulov < >>> > alexan...@ververica.com> >>> > > wrote: >>> > > >>> > > > Hi Becket, >>> > > > >>> > > > I updated and extended FLIP-238 accordingly. >>> > > > >>> > > > Here is also my POC branch [1]. >>> > > > DataGeneratorSourceV3 is the class that I currently converged on >>> [2]. >>> > It >>> > > is >>> > > > based on the expanded SourceReaderContext. >>> > > > A couple more relevant classes [3] [4] >>> > > > >>> > > > Would appreciate it if you could take a quick look. >>> > > > >>> > > > [1] >>> > https://github.com/afedulov/flink/tree/FLINK-27919-generator-source >>> > > > [2] >>> > > > >>> > > > >>> > > >>> > >>> https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceV3.java >>> > > > [3] >>> > > > >>> > > > >>> > > >>> > >>> https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/MappingIteratorSourceReader.java >>> > > > [4] >>> > > > >>> > > > >>> > > >>> > >>> https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReader.java >>> > > > >>> > > > Best, >>> > > > Alexander Fedulov >>> > > > >>> > > > On Mon, Jul 4, 2022 at 12:08 PM Alexander Fedulov < >>> > > alexan...@ververica.com >>> > > > > >>> > > > wrote: >>> > > > >>> > > > > Hi Becket, >>> > > > > >>> > > > > Exposing the RuntimeContext is potentially even more useful. >>> > > > > Do you think it is worth having both currentParallelism() and >>> > > > > getRuntimeContext() methods? >>> > > > > One can always call getNumberOfParallelSubtasks() on the >>> > RuntimeContext >>> > > > > directly if we expose it. >>> > > > > >>> > > > > Best, >>> > > > > Alexander Fedulov >>> > > > > >>> > > > > >>> > > > > On Mon, Jul 4, 2022 at 3:44 AM Becket Qin <becket....@gmail.com> >>> > > wrote: >>> > > > > >>> > > > >> Hi Alex, >>> > > > >> >>> > > > >> Yes, that is what I had in mind. We need to add the method >>> > > > >> getRuntimeContext() to the SourceReaderContext interface as >>> well. >>> > > > >> >>> > > > >> Thanks, >>> > > > >> >>> > > > >> Jiangjie (Becket) Qin >>> > > > >> >>> > > > >> On Mon, Jul 4, 2022 at 3:01 AM Alexander Fedulov < >>> > > > alexan...@ververica.com >>> > > > >> > >>> > > > >> wrote: >>> > > > >> >>> > > > >> > Hi Becket, >>> > > > >> > >>> > > > >> > thanks for your input. I like the idea of adding the >>> parallelism >>> > to >>> > > > the >>> > > > >> > SourceReaderContext. My understanding is that any change of >>> > > > parallelism >>> > > > >> > causes recreation of all readers, so it should be safe to >>> consider >>> > > it >>> > > > >> > "fixed" after the readers' initialization. In that case, it >>> should >>> > > be >>> > > > as >>> > > > >> > simple as adding the following to the anonymous >>> > SourceReaderContext >>> > > > >> > implementation >>> > > > >> > in SourceOperator#initReader(): >>> > > > >> > >>> > > > >> > public int currentParallelism() { >>> > > > >> > return getRuntimeContext().getNumberOfParallelSubtasks(); >>> > > > >> > } >>> > > > >> > >>> > > > >> > Is that what you had in mind? >>> > > > >> > >>> > > > >> > Best, >>> > > > >> > Alexander Fedulov >>> > > > >> > >>> > > > >> > >>> > > > >> > >>> > > > >> > >>> > > > >> > On Fri, Jul 1, 2022 at 11:30 AM Becket Qin < >>> becket....@gmail.com> >>> > > > >> wrote: >>> > > > >> > >>> > > > >> > > Hi Alex, >>> > > > >> > > >>> > > > >> > > In FLIP-27 source, the SourceReader can get a >>> > SourceReaderContext. >>> > > > >> This >>> > > > >> > is >>> > > > >> > > passed in by the TM in Source#createReader(). And >>> supposedly the >>> > > > >> Source >>> > > > >> > > should pass this to the SourceReader if needed. >>> > > > >> > > >>> > > > >> > > In the SourceReaderContext, currently only the index of the >>> > > current >>> > > > >> > subtask >>> > > > >> > > is available, but we can probably add the current >>> parallelism as >>> > > > well. >>> > > > >> > This >>> > > > >> > > would be a change that affects all the Sources, not only >>> for the >>> > > > data >>> > > > >> > > generator source. Perhaps we can have a simple separate >>> FLIP. >>> > > > >> > > >>> > > > >> > > Regarding the semantic of rate limiting, for the rate limit >>> > > source, >>> > > > >> > > personally I feel intuitive to keep the global rate >>> untouched on >>> > > > >> scaling. >>> > > > >> > > >>> > > > >> > > Thanks, >>> > > > >> > > >>> > > > >> > > Jiangjie (Becket) Qin >>> > > > >> > > >>> > > > >> > > On Fri, Jul 1, 2022 at 4:00 AM Alexander Fedulov < >>> > > > >> > alexan...@ververica.com> >>> > > > >> > > wrote: >>> > > > >> > > >>> > > > >> > > > Hi all, >>> > > > >> > > > >>> > > > >> > > > getting back to the idea of reusing >>> FlinkConnectorRateLimiter: >>> > > it >>> > > > is >>> > > > >> > > > designed for the SourceFunction API and has an open() >>> method >>> > > that >>> > > > >> > takes a >>> > > > >> > > > RuntimeContext. Therefore, we need to add a different >>> > interface >>> > > > for >>> > > > >> > > > the new Source >>> > > > >> > > > API. >>> > > > >> > > > >>> > > > >> > > > This is where I see a certain limitation for the >>> rate-limiting >>> > > use >>> > > > >> > case: >>> > > > >> > > in >>> > > > >> > > > the old API the individual readers were able to retrieve >>> the >>> > > > current >>> > > > >> > > > parallelism from the RuntimeContext. In the new API, this >>> is >>> > not >>> > > > >> > > supported, >>> > > > >> > > > the information about the parallelism is only available >>> in the >>> > > > >> > > > SplitEnumeratorContext to which the readers do not have >>> > access. >>> > > > >> > > > >>> > > > >> > > > I see two possibilities: >>> > > > >> > > > 1. Add an optional RateLimiter parameter to the >>> > > > DataGeneratorSource >>> > > > >> > > > constructor. The RateLimiter is then "fixed" and has to be >>> > fully >>> > > > >> > > configured >>> > > > >> > > > by the user in the main method. >>> > > > >> > > > 2. Piggy-back on Splits: add parallelism as a field of a >>> > Split. >>> > > > The >>> > > > >> > > > initialization of this field would happen dynamically upon >>> > > splits >>> > > > >> > > creation >>> > > > >> > > > in the createEnumerator() method where currentParallelism >>> is >>> > > > >> available. >>> > > > >> > > > >>> > > > >> > > > The second approach makes implementation rather >>> significantly >>> > > more >>> > > > >> > > > complex since we cannot simply wrap >>> > > > >> > NumberSequenceSource.SplitSerializer >>> > > > >> > > in >>> > > > >> > > > that case. The advantage of this approach is that with any >>> > kind >>> > > of >>> > > > >> > > > autoscaling, the source rate will match the original >>> > > > configuration. >>> > > > >> But >>> > > > >> > > I'm >>> > > > >> > > > not sure how useful this is. I can even imagine scenarios >>> > where >>> > > > >> scaling >>> > > > >> > > the >>> > > > >> > > > input rate together with parallelism would be better for >>> demo >>> > > > >> purposes. >>> > > > >> > > > >>> > > > >> > > > Would be glad to hear your thoughts on this. >>> > > > >> > > > >>> > > > >> > > > Best, >>> > > > >> > > > Alexander Fedulov >>> > > > >> > > > >>> > > > >> > > > On Mon, Jun 20, 2022 at 4:31 PM David Anderson < >>> > > > >> dander...@apache.org> >>> > > > >> > > > wrote: >>> > > > >> > > > >>> > > > >> > > > > I'm very happy with this. +1 >>> > > > >> > > > > >>> > > > >> > > > > A lot of SourceFunction implementations used in >>> demos/POC >>> > > > >> > > implementations >>> > > > >> > > > > include a call to sleep(), so adding rate limiting is a >>> good >>> > > > >> idea, in >>> > > > >> > > my >>> > > > >> > > > > opinion. >>> > > > >> > > > > >>> > > > >> > > > > Best, >>> > > > >> > > > > David >>> > > > >> > > > > >>> > > > >> > > > > On Mon, Jun 20, 2022 at 10:10 AM Qingsheng Ren < >>> > > > >> renqs...@gmail.com> >>> > > > >> > > > wrote: >>> > > > >> > > > > >>> > > > >> > > > > > Hi Alexander, >>> > > > >> > > > > > >>> > > > >> > > > > > Thanks for creating this FLIP! I’d like to share some >>> > > > thoughts. >>> > > > >> > > > > > >>> > > > >> > > > > > 1. About the “generatorFunction” I’m expecting an >>> > > initializer >>> > > > >> on it >>> > > > >> > > > > > because it’s hard to require all fields in the >>> generator >>> > > > >> function >>> > > > >> > are >>> > > > >> > > > > > serializable in user’s implementation. Providing a >>> > function >>> > > > like >>> > > > >> > > “open” >>> > > > >> > > > > in >>> > > > >> > > > > > the interface could let the function to make some >>> > > > >> initializations >>> > > > >> > in >>> > > > >> > > > the >>> > > > >> > > > > > task initializing stage. >>> > > > >> > > > > > >>> > > > >> > > > > > 2. As of the throttling functinality you mentioned, >>> > there’s >>> > > a >>> > > > >> > > > > > FlinkConnectorRateLimiter under flink-core and maybe >>> we >>> > > could >>> > > > >> reuse >>> > > > >> > > > this >>> > > > >> > > > > > interface. Actually I prefer to make rate limiting as >>> a >>> > > common >>> > > > >> > > feature >>> > > > >> > > > > > provided in the Source API, but this requires another >>> FLIP >>> > > > and a >>> > > > >> > lot >>> > > > >> > > of >>> > > > >> > > > > > discussions so I’m OK to have it in the DataGen source >>> > > first. >>> > > > >> > > > > > >>> > > > >> > > > > > Best regards, >>> > > > >> > > > > > Qingsheng >>> > > > >> > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > > > On Jun 17, 2022, at 01:47, Alexander Fedulov < >>> > > > >> > > > alexan...@ververica.com> >>> > > > >> > > > > > wrote: >>> > > > >> > > > > > > >>> > > > >> > > > > > > Hi Jing, >>> > > > >> > > > > > > >>> > > > >> > > > > > > thanks for your thorough analysis. I agree with the >>> > points >>> > > > you >>> > > > >> > make >>> > > > >> > > > and >>> > > > >> > > > > > > also with the idea to approach the larger task of >>> > > providing >>> > > > a >>> > > > >> > > > universal >>> > > > >> > > > > > > (DataStream + SQL) data generator base iteratively. >>> > > > >> > > > > > > Regarding the name, the SourceFunction-based >>> > > > >> > *DataGeneratorSource* >>> > > > >> > > > > > resides >>> > > > >> > > > > > > in the >>> > > > >> > *org.apache.flink.streaming.api.functions.source.datagen*. I >>> > > > >> > > > > think >>> > > > >> > > > > > > it is OK to simply place the new one (with the same >>> > name) >>> > > > >> next to >>> > > > >> > > the >>> > > > >> > > > > > > *NumberSequenceSource* into >>> > > > >> > > > > *org.apache.flink.api.connector.source.lib*. >>> > > > >> > > > > > > >>> > > > >> > > > > > > One more thing I wanted to discuss: I noticed that >>> > > > >> > > > *DataGenTableSource >>> > > > >> > > > > > *has >>> > > > >> > > > > > > built-in throttling functionality >>> (*rowsPerSecond*). I >>> > > > >> believe it >>> > > > >> > > is >>> > > > >> > > > > > > something that could be also useful for the >>> DataStream >>> > > users >>> > > > >> of >>> > > > >> > the >>> > > > >> > > > > > > stateless data generator and since we want to >>> eventually >>> > > > >> converge >>> > > > >> > > on >>> > > > >> > > > > the >>> > > > >> > > > > > > same implementation for DataStream and Table/SQL it >>> > sounds >>> > > > >> like a >>> > > > >> > > > good >>> > > > >> > > > > > idea >>> > > > >> > > > > > > to add it to the FLIP. What do you think? >>> > > > >> > > > > > > >>> > > > >> > > > > > > Best, >>> > > > >> > > > > > > Alexander Fedulov >>> > > > >> > > > > > > >>> > > > >> > > > > > > >>> > > > >> > > > > > > On Tue, Jun 14, 2022 at 7:17 PM Jing Ge < >>> > > j...@ververica.com >>> > > > > >>> > > > >> > > wrote: >>> > > > >> > > > > > > >>> > > > >> > > > > > >> Hi, >>> > > > >> > > > > > >> >>> > > > >> > > > > > >> After reading all discussions posted in this >>> thread and >>> > > the >>> > > > >> > source >>> > > > >> > > > > code >>> > > > >> > > > > > of >>> > > > >> > > > > > >> DataGeneratorSource which unfortunately used >>> "Source" >>> > > > >> instead of >>> > > > >> > > > > > >> "SourceFunction" in its name, issues could >>> summarized >>> > as >>> > > > >> > > following: >>> > > > >> > > > > > >> >>> > > > >> > > > > > >> 1. The current DataGeneratorSource based on >>> > > SourceFunction >>> > > > >> is a >>> > > > >> > > > > stateful >>> > > > >> > > > > > >> source connector and built for Table/SQL. >>> > > > >> > > > > > >> 2. The right name for the new data generator source >>> > i.e. >>> > > > >> > > > > > >> DataGeneratorSource has been used for the current >>> > > > >> implementation >>> > > > >> > > > based >>> > > > >> > > > > > on >>> > > > >> > > > > > >> SourceFunction. >>> > > > >> > > > > > >> 3. A new data generator source should be developed >>> > based >>> > > on >>> > > > >> the >>> > > > >> > > new >>> > > > >> > > > > > Source >>> > > > >> > > > > > >> API. >>> > > > >> > > > > > >> 4. The new data generator source should be used >>> both >>> > for >>> > > > >> > > DataStream >>> > > > >> > > > > and >>> > > > >> > > > > > >> Table/SQL, which means the current >>> DataGeneratorSource >>> > > > >> should be >>> > > > >> > > > > > replaced >>> > > > >> > > > > > >> with the new one. >>> > > > >> > > > > > >> 5. The core event generation logic should be >>> pluggable >>> > to >>> > > > >> > support >>> > > > >> > > > > > various >>> > > > >> > > > > > >> (test) scenarios, e.g. rondom stream, changlog >>> stream, >>> > > > >> > > controllable >>> > > > >> > > > > > events >>> > > > >> > > > > > >> per checkpoint, etc. >>> > > > >> > > > > > >> >>> > > > >> > > > > > >> which turns out that >>> > > > >> > > > > > >> >>> > > > >> > > > > > >> To solve 1+3+4 -> we will have to make a big >>> effort to >>> > > > >> replace >>> > > > >> > the >>> > > > >> > > > > > current >>> > > > >> > > > > > >> DataGeneratorSource since the new Source API has a >>> very >>> > > > >> > different >>> > > > >> > > > > > >> concept, especially for the stateful part. >>> > > > >> > > > > > >> To solve 2+3 -> we have to find another name for >>> the >>> > new >>> > > > >> > > > > implementation. >>> > > > >> > > > > > >> To solve 1+3+4+5 -> It gets even more complicated >>> to >>> > > > support >>> > > > >> > > > stateless >>> > > > >> > > > > > and >>> > > > >> > > > > > >> stateful scenarios simultaneously with one >>> solution. >>> > > > >> > > > > > >> >>> > > > >> > > > > > >> If we want to solve all of these issues in one >>> shot, It >>> > > > might >>> > > > >> > take >>> > > > >> > > > > > months. >>> > > > >> > > > > > >> Therefore, I would suggest starting from small and >>> > > growing >>> > > > up >>> > > > >> > > > > > iteratively. >>> > > > >> > > > > > >> >>> > > > >> > > > > > >> The proposal for the kickoff is to focus on >>> stateless >>> > > event >>> > > > >> > > > generation >>> > > > >> > > > > > >> with e.g. rondom stream and use the name >>> > > > >> > > > > "StatelessDataGeneratoSource". >>> > > > >> > > > > > >> The will be a period of time that both >>> > > DataGeneratorSource >>> > > > >> will >>> > > > >> > be >>> > > > >> > > > > used >>> > > > >> > > > > > by >>> > > > >> > > > > > >> the developer. The current DataGeneratorSource >>> will be >>> > > then >>> > > > >> > > > > deprecated, >>> > > > >> > > > > > >> once we can(iteratively): >>> > > > >> > > > > > >> 1. either enlarge the scope of >>> > > StatelessDataGeneratoSourcer >>> > > > >> to >>> > > > >> > be >>> > > > >> > > > able >>> > > > >> > > > > > to >>> > > > >> > > > > > >> cover stateful scenarios and renaming it to >>> > > > >> > > > > > "DataGeneratorSourceV2"(follow >>> > > > >> > > > > > >> the naming convention of SinkV2) or >>> > > > >> > > > > > >> 2. develop a new "SatefullDataGeneratorSource" >>> based on >>> > > > >> Source >>> > > > >> > API >>> > > > >> > > > > which >>> > > > >> > > > > > >> can handle the stateful scenarios, if it is >>> impossible >>> > to >>> > > > >> > support >>> > > > >> > > > both >>> > > > >> > > > > > >> stateless and stateful scenarios with one >>> > GeneratorSource >>> > > > >> > > > > > implementation. >>> > > > >> > > > > > >> >>> > > > >> > > > > > >> Best regards, >>> > > > >> > > > > > >> Jing >>> > > > >> > > > > > >> >>> > > > >> > > > > > >> On Thu, Jun 9, 2022 at 2:48 PM Martijn Visser < >>> > > > >> > > > > martijnvis...@apache.org >>> > > > >> > > > > > > >>> > > > >> > > > > > >> wrote: >>> > > > >> > > > > > >> >>> > > > >> > > > > > >>> Hey Alex, >>> > > > >> > > > > > >>> >>> > > > >> > > > > > >>> Yes, I think we need to make sure that we're not >>> > causing >>> > > > >> > > confusion >>> > > > >> > > > (I >>> > > > >> > > > > > know >>> > > > >> > > > > > >>> I already was confused). I think the >>> > DataSupplierSource >>> > > is >>> > > > >> > > already >>> > > > >> > > > > > better, >>> > > > >> > > > > > >>> but perhaps there are others who have an even >>> better >>> > > idea. >>> > > > >> > > > > > >>> >>> > > > >> > > > > > >>> Thanks, >>> > > > >> > > > > > >>> >>> > > > >> > > > > > >>> Martijn >>> > > > >> > > > > > >>> >>> > > > >> > > > > > >>> Op do 9 jun. 2022 om 14:28 schreef Alexander >>> Fedulov < >>> > > > >> > > > > > >>> alexan...@ververica.com>: >>> > > > >> > > > > > >>> >>> > > > >> > > > > > >>>> Hi Martijn, >>> > > > >> > > > > > >>>> >>> > > > >> > > > > > >>>> It seems that they serve a bit different purposes >>> > > though. >>> > > > >> The >>> > > > >> > > > > > >>>> DataGenTableSource is for generating random data >>> > > > described >>> > > > >> by >>> > > > >> > > the >>> > > > >> > > > > > Table >>> > > > >> > > > > > >>>> DDL and is tied into the >>> > RowDataGenerator/DataGenerator >>> > > > >> > concept >>> > > > >> > > > > which >>> > > > >> > > > > > is >>> > > > >> > > > > > >>>> implemented as an Iterator<T>. The proposed API >>> in >>> > > > >> contrast >>> > > > >> > is >>> > > > >> > > > > > supposed >>> > > > >> > > > > > >>>> to provide users with an easy way to supply their >>> > > custom >>> > > > >> data. >>> > > > >> > > > > Another >>> > > > >> > > > > > >>>> difference is that a DataGenerator is supposed >>> to be >>> > > > >> stateful >>> > > > >> > > and >>> > > > >> > > > > has >>> > > > >> > > > > > to >>> > > > >> > > > > > >>>> snapshot its state, whereas the proposed API is >>> > purely >>> > > > >> driven >>> > > > >> > by >>> > > > >> > > > the >>> > > > >> > > > > > >>> input >>> > > > >> > > > > > >>>> index IDs and can be stateless yet remain >>> > > deterministic. >>> > > > >> Are >>> > > > >> > you >>> > > > >> > > > > sure >>> > > > >> > > > > > it >>> > > > >> > > > > > >>>> is a good idea to mix them into the same API? We >>> > could >>> > > > >> think >>> > > > >> > of >>> > > > >> > > > > using >>> > > > >> > > > > > a >>> > > > >> > > > > > >>>> different name to make it less confusing for the >>> > users >>> > > > >> > > (something >>> > > > >> > > > > like >>> > > > >> > > > > > >>>> DataSupplierSource). >>> > > > >> > > > > > >>>> >>> > > > >> > > > > > >>>> Best, >>> > > > >> > > > > > >>>> Alexander Fedulov >>> > > > >> > > > > > >>>> >>> > > > >> > > > > > >>>> On Thu, Jun 9, 2022 at 9:14 AM Martijn Visser < >>> > > > >> > > > > > martijnvis...@apache.org >>> > > > >> > > > > > >>>> >>> > > > >> > > > > > >>>> wrote: >>> > > > >> > > > > > >>>> >>> > > > >> > > > > > >>>>> Hi Alex, >>> > > > >> > > > > > >>>>> >>> > > > >> > > > > > >>>>> Thanks for creating the FLIP and opening up the >>> > > > >> discussion. >>> > > > >> > +1 >>> > > > >> > > > > > overall >>> > > > >> > > > > > >>> for >>> > > > >> > > > > > >>>>> getting this in place. >>> > > > >> > > > > > >>>>> >>> > > > >> > > > > > >>>>> One question: you've already mentioned that this >>> > > > focussed >>> > > > >> on >>> > > > >> > > the >>> > > > >> > > > > > >>>>> DataStream >>> > > > >> > > > > > >>>>> API. I think it would be a bit confusing that we >>> > have >>> > > a >>> > > > >> > Datagen >>> > > > >> > > > > > >>> connector >>> > > > >> > > > > > >>>>> (on the Table side) that wouldn't leverage this >>> > target >>> > > > >> > > > interface. I >>> > > > >> > > > > > >>> think >>> > > > >> > > > > > >>>>> it would be good if we could already have one >>> > generic >>> > > > >> Datagen >>> > > > >> > > > > > connector >>> > > > >> > > > > > >>>>> which works for both DataStream API (so that >>> would >>> > be >>> > > a >>> > > > >> new >>> > > > >> > one >>> > > > >> > > > in >>> > > > >> > > > > > the >>> > > > >> > > > > > >>>>> Flink repo) and that the Datagen in the Table >>> > > landscape >>> > > > is >>> > > > >> > > using >>> > > > >> > > > > this >>> > > > >> > > > > > >>>>> target interface too. What do you think? >>> > > > >> > > > > > >>>>> >>> > > > >> > > > > > >>>>> Best regards, >>> > > > >> > > > > > >>>>> >>> > > > >> > > > > > >>>>> Martijn >>> > > > >> > > > > > >>>>> >>> > > > >> > > > > > >>>>> Op wo 8 jun. 2022 om 14:21 schreef Alexander >>> > Fedulov < >>> > > > >> > > > > > >>>>> alexan...@ververica.com>: >>> > > > >> > > > > > >>>>> >>> > > > >> > > > > > >>>>>> Hi Xianxun, >>> > > > >> > > > > > >>>>>> >>> > > > >> > > > > > >>>>>> Thanks for bringing it up. I do believe it >>> would be >>> > > > >> useful >>> > > > >> > to >>> > > > >> > > > have >>> > > > >> > > > > > >>> such >>> > > > >> > > > > > >>>>> a >>> > > > >> > > > > > >>>>>> CDC data generator but I see the >>> > > > >> > > > > > >>>>>> efforts to provide one a bit orthogonal to the >>> > > > >> > > > DataSourceGenerator >>> > > > >> > > > > > >>>>> proposed >>> > > > >> > > > > > >>>>>> in the FLIP. FLIP-238 focuses >>> > > > >> > > > > > >>>>>> on the DataStream API and I could see >>> integration >>> > > into >>> > > > >> the >>> > > > >> > > > > Table/SQL >>> > > > >> > > > > > >>>>>> ecosystem as the next step that I would >>> > > > >> > > > > > >>>>>> prefer to keep separate (see KafkaDynamicSource >>> > > reusing >>> > > > >> > > > > > >>>>>> KafkaSource<RowData> >>> > > > >> > > > > > >>>>>> under the hood [1]). >>> > > > >> > > > > > >>>>>> >>> > > > >> > > > > > >>>>>> [1] >>> > > > >> > > > > > >>>>>> >>> > > > >> > > > > > >>>>>> >>> > > > >> > > > > > >>>>> >>> > > > >> > > > > > >>> >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> https://github.com/apache/flink/blob/3adca15859b36c61116fc56fabc24e9647f0ec5f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java#L223 >>> > > > >> > > > > > >>>>>> >>> > > > >> > > > > > >>>>>> Best, >>> > > > >> > > > > > >>>>>> Alexander Fedulov >>> > > > >> > > > > > >>>>>> >>> > > > >> > > > > > >>>>>> >>> > > > >> > > > > > >>>>>> >>> > > > >> > > > > > >>>>>> >>> > > > >> > > > > > >>>>>> On Wed, Jun 8, 2022 at 11:01 AM Xianxun Ye < >>> > > > >> > yxx_c...@163.com> >>> > > > >> > > > > > wrote: >>> > > > >> > > > > > >>>>>> >>> > > > >> > > > > > >>>>>>> Hey Alexander, >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> Making datagen source connector easier to use >>> is >>> > > > really >>> > > > >> > > helpful >>> > > > >> > > > > > >>> during >>> > > > >> > > > > > >>>>>>> doing some PoC/Demo. >>> > > > >> > > > > > >>>>>>> And I thought about is it possible to produce >>> a >>> > > > >> changelog >>> > > > >> > > > stream >>> > > > >> > > > > by >>> > > > >> > > > > > >>>>>>> datagen source, so a new flink developer can >>> > > practice >>> > > > >> flink >>> > > > >> > > sql >>> > > > >> > > > > > >>> with >>> > > > >> > > > > > >>>>> cdc >>> > > > >> > > > > > >>>>>>> data using Flink SQL Client CLI. >>> > > > >> > > > > > >>>>>>> In the flink-examples-table module, a >>> > > > >> > ChangelogSocketExample >>> > > > >> > > > > > >>> class[1] >>> > > > >> > > > > > >>>>>>> describes how to ingest delete or insert data >>> by >>> > > 'nc' >>> > > > >> > > command. >>> > > > >> > > > > Can >>> > > > >> > > > > > >>> we >>> > > > >> > > > > > >>>>>>> support producing a changelog stream by the >>> new >>> > > > datagen >>> > > > >> > > source? >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> [1] >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>> >>> > > > >> > > > > > >>>>> >>> > > > >> > > > > > >>> >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java#L79 >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> Best regards, >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> Xianxun >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> On 06/8/2022 08:10,Alexander Fedulov< >>> > > > >> > alexan...@ververica.com >>> > > > >> > > > >>> > > > >> > > > > > >>>>>>> <alexan...@ververica.com> wrote: >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> I looked a bit further and it seems it should >>> > > actually >>> > > > >> be >>> > > > >> > > > easier >>> > > > >> > > > > > >>> than >>> > > > >> > > > > > >>>>> I >>> > > > >> > > > > > >>>>>>> initially thought: SourceReader extends >>> > > > >> CheckpointListener >>> > > > >> > > > > > >>> interface >>> > > > >> > > > > > >>>>> and >>> > > > >> > > > > > >>>>>>> with its custom implementation it should be >>> > possible >>> > > > to >>> > > > >> > > achieve >>> > > > >> > > > > > >>>>> similar >>> > > > >> > > > > > >>>>>>> results. A prototype that I have for the >>> generator >>> > > > uses >>> > > > >> an >>> > > > >> > > > > > >>>>>>> IteratorSourceReader >>> > > > >> > > > > > >>>>>>> under the hood by default but we could >>> consider >>> > > adding >>> > > > >> the >>> > > > >> > > > > ability >>> > > > >> > > > > > >>> to >>> > > > >> > > > > > >>>>>>> supply something like a >>> > > > DataGeneratorSourceReaderFactory >>> > > > >> > that >>> > > > >> > > > > would >>> > > > >> > > > > > >>>>> allow >>> > > > >> > > > > > >>>>>>> provisioning the DataGeneratorSource with >>> > customized >>> > > > >> > > > > > >>> implementations >>> > > > >> > > > > > >>>>> for >>> > > > >> > > > > > >>>>>>> cases like this. >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> Best, >>> > > > >> > > > > > >>>>>>> Alexander Fedulov >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> On Wed, Jun 8, 2022 at 12:58 AM Alexander >>> Fedulov >>> > < >>> > > > >> > > > > > >>>>>> alexan...@ververica.com >>> > > > >> > > > > > >>>>>>>> >>> > > > >> > > > > > >>>>>>> wrote: >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> Hi Steven, >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> This is going to be tricky since in the new >>> Source >>> > > API >>> > > > >> the >>> > > > >> > > > > > >>>>> checkpointing >>> > > > >> > > > > > >>>>>>> aspects that you based your logic on are >>> pushed >>> > > > further >>> > > > >> > away >>> > > > >> > > > from >>> > > > >> > > > > > >>> the >>> > > > >> > > > > > >>>>>>> low-level interfaces responsible for handling >>> data >>> > > and >>> > > > >> > splits >>> > > > >> > > > > [1]. >>> > > > >> > > > > > >>> At >>> > > > >> > > > > > >>>>> the >>> > > > >> > > > > > >>>>>>> same time, the SourceCoordinatorProvider is >>> > > hardwired >>> > > > >> into >>> > > > >> > > the >>> > > > >> > > > > > >>>>> internals >>> > > > >> > > > > > >>>>>>> of the framework, so I don't think it will be >>> > > possible >>> > > > >> to >>> > > > >> > > > > provide a >>> > > > >> > > > > > >>>>>>> customized implementation for testing >>> purposes. >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> The only chance to tie data generation to >>> > > > checkpointing >>> > > > >> in >>> > > > >> > > the >>> > > > >> > > > > new >>> > > > >> > > > > > >>>>> Source >>> > > > >> > > > > > >>>>>>> API that I see at the moment is via the >>> > > > SplitEnumerator >>> > > > >> > > > > serializer >>> > > > >> > > > > > >>> ( >>> > > > >> > > > > > >>>>>>> getEnumeratorCheckpointSerializer() method) >>> [2]. >>> > In >>> > > > >> theory, >>> > > > >> > > it >>> > > > >> > > > > > >>> should >>> > > > >> > > > > > >>>>> be >>> > > > >> > > > > > >>>>>>> possible to share a variable visible both to >>> the >>> > > > >> generator >>> > > > >> > > > > function >>> > > > >> > > > > > >>>>> and >>> > > > >> > > > > > >>>>>> to >>> > > > >> > > > > > >>>>>>> the serializer and manipulate it whenever the >>> > > > >> serialize() >>> > > > >> > > > method >>> > > > >> > > > > > >>> gets >>> > > > >> > > > > > >>>>>>> called upon a checkpoint request. That said, >>> you >>> > > still >>> > > > >> > won't >>> > > > >> > > > get >>> > > > >> > > > > > >>>>>>> notifications of successful checkpoints that >>> you >>> > > > >> currently >>> > > > >> > > use >>> > > > >> > > > > > >>> (this >>> > > > >> > > > > > >>>>> info >>> > > > >> > > > > > >>>>>>> is only available to the SourceCoordinator). >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> In general, regardless of the generator >>> > > implementation >>> > > > >> > > itself, >>> > > > >> > > > > the >>> > > > >> > > > > > >>> new >>> > > > >> > > > > > >>>>>>> Source >>> > > > >> > > > > > >>>>>>> API does not seem to support the use case of >>> > > verifying >>> > > > >> > > > > checkpoints >>> > > > >> > > > > > >>>>>>> contents in lockstep with produced data, at >>> least >>> > I >>> > > do >>> > > > >> not >>> > > > >> > > see >>> > > > >> > > > an >>> > > > >> > > > > > >>>>>> immediate >>> > > > >> > > > > > >>>>>>> solution for this. Can you think of a >>> different >>> > way >>> > > of >>> > > > >> > > checking >>> > > > >> > > > > the >>> > > > >> > > > > > >>>>>>> correctness of the Iceberg Sink implementation >>> > that >>> > > > does >>> > > > >> > not >>> > > > >> > > > rely >>> > > > >> > > > > > >>> on >>> > > > >> > > > > > >>>>> this >>> > > > >> > > > > > >>>>>>> approach? >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> Best, >>> > > > >> > > > > > >>>>>>> Alexander Fedulov >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> [1] >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>> >>> > > > >> > > > > > >>>>> >>> > > > >> > > > > > >>> >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> https://github.com/apache/flink/blob/0f19c2472c54aac97e4067f5398731ab90036d1a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L337 >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> [2] >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>> >>> > > > >> > > > > > >>>>> >>> > > > >> > > > > > >>> >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> https://github.com/apache/flink/blob/e4b000818c15b5b781c4e5262ba83bfc9d65121a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L97 >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> On Tue, Jun 7, 2022 at 6:03 PM Steven Wu < >>> > > > >> > > stevenz...@gmail.com >>> > > > >> > > > > >>> > > > >> > > > > > >>>>> wrote: >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> In Iceberg source, we have a data generator >>> source >>> > > > that >>> > > > >> can >>> > > > >> > > > > control >>> > > > >> > > > > > >>>>> the >>> > > > >> > > > > > >>>>>>> records per checkpoint cycle. Can we support >>> sth >>> > > like >>> > > > >> this >>> > > > >> > in >>> > > > >> > > > the >>> > > > >> > > > > > >>>>>>> DataGeneratorSource? >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>> >>> > > > >> > > > > > >>>>> >>> > > > >> > > > > > >>> >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> https://github.com/apache/iceberg/blob/master/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java >>> > > > >> > > > > > >>>>>>> public BoundedTestSource(List<List<T>> >>> > > > >> > elementsPerCheckpoint, >>> > > > >> > > > > > >>> boolean >>> > > > >> > > > > > >>>>>>> checkpointEnabled) >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> Thanks, >>> > > > >> > > > > > >>>>>>> Steven >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> On Tue, Jun 7, 2022 at 8:48 AM Alexander >>> Fedulov < >>> > > > >> > > > > > >>>>>> alexan...@ververica.com >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> wrote: >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> Hi everyone, >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> I would like to open a discussion on FLIP-238: >>> > > > Introduce >>> > > > >> > > > > > >>> FLIP-27-based >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> Data >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> Generator Source [1]. During the discussion >>> about >>> > > > >> > deprecating >>> > > > >> > > > the >>> > > > >> > > > > > >>>>>>> SourceFunction API [2] it became evident that >>> an >>> > > > >> > easy-to-use >>> > > > >> > > > > > >>>>>>> FLIP-27-compatible data generator source is >>> needed >>> > > so >>> > > > >> that >>> > > > >> > > the >>> > > > >> > > > > > >>> current >>> > > > >> > > > > > >>>>>>> SourceFunction-based data generator >>> > implementations >>> > > > >> could >>> > > > >> > be >>> > > > >> > > > > phased >>> > > > >> > > > > > >>>>> out >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> for >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> both Flink demo/PoC applications and for the >>> > > internal >>> > > > >> Flink >>> > > > >> > > > > tests. >>> > > > >> > > > > > >>>>> This >>> > > > >> > > > > > >>>>>>> FLIP proposes to introduce a generic >>> > > > DataGeneratorSource >>> > > > >> > > > capable >>> > > > >> > > > > of >>> > > > >> > > > > > >>>>>>> producing events of an arbitrary type based >>> on a >>> > > > >> > > user-supplied >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> MapFunction. >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> Looking forward to your feedback. >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> [1] >>> https://cwiki.apache.org/confluence/x/9Av1D >>> > > > >> > > > > > >>>>>>> [2] >>> > > > >> > > > > > >>> >>> > > > >> > >>> https://lists.apache.org/thread/d6cwqw9b3105wcpdkwq7rr4s7x4ywqr9 >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> Best, >>> > > > >> > > > > > >>>>>>> Alexander Fedulov >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>>> >>> > > > >> > > > > > >>>>>> >>> > > > >> > > > > > >>>>> >>> > > > >> > > > > > >>>> >>> > > > >> > > > > > >>> >>> > > > >> > > > > > >> >>> > > > >> > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > > >>> > > > >>> > > >>> > >>> >>