Hi Becket,

I agree with you. We could introduce a *ReadOnlyRuntimeContext* that would
act as a holder for the *RuntimeContext* data. This would also require
read-only wrappers for the exposed fields, such as *ExecutionConfig*.
Alternatively, we just add the *currentParallelism()* method for now and
see if anything else might actually be needed later on. What do you think?

Best,
Alexander Fedulov

On Tue, Jul 5, 2022 at 2:30 AM Becket Qin <becket....@gmail.com> wrote:

> Hi Alex,
>
> While it is true that the RuntimeContext gives access to all the stuff the
> framework can provide, it seems a little overkilling for the SourceReader.
> It is probably OK to expose all the read-only information in the
> RuntimeContext to the SourceReader, but we may want to hide the "write"
> methods, such as creating states, writing stuff to distributed cache, etc,
> because these methods may not work well with the SourceReader design and
> cause confusion. For example, users may wonder why the snapshotState()
> method exists while they can use the state directly.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Tue, Jul 5, 2022 at 7:37 AM Alexander Fedulov <alexan...@ververica.com>
> wrote:
>
> > Hi Becket,
> >
> > I updated and extended FLIP-238 accordingly.
> >
> > Here is also my POC branch [1].
> > DataGeneratorSourceV3 is the class that I currently converged on [2]. It
> is
> > based on the expanded SourceReaderContext.
> > A couple more relevant classes [3] [4]
> >
> > Would appreciate it if you could take a quick look.
> >
> > [1]  https://github.com/afedulov/flink/tree/FLINK-27919-generator-source
> > [2]
> >
> >
> https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceV3.java
> > [3]
> >
> >
> https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/MappingIteratorSourceReader.java
> > [4]
> >
> >
> https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReader.java
> >
> > Best,
> > Alexander Fedulov
> >
> > On Mon, Jul 4, 2022 at 12:08 PM Alexander Fedulov <
> alexan...@ververica.com
> > >
> > wrote:
> >
> > > Hi Becket,
> > >
> > > Exposing the RuntimeContext is potentially even more useful.
> > > Do you think it is worth having both currentParallelism() and
> > >  getRuntimeContext() methods?
> > > One can always call getNumberOfParallelSubtasks() on the RuntimeContext
> > > directly if we expose it.
> > >
> > > Best,
> > > Alexander Fedulov
> > >
> > >
> > > On Mon, Jul 4, 2022 at 3:44 AM Becket Qin <becket....@gmail.com>
> wrote:
> > >
> > >> Hi Alex,
> > >>
> > >> Yes, that is what I had in mind. We need to add the method
> > >> getRuntimeContext() to the SourceReaderContext interface as well.
> > >>
> > >> Thanks,
> > >>
> > >> Jiangjie (Becket) Qin
> > >>
> > >> On Mon, Jul 4, 2022 at 3:01 AM Alexander Fedulov <
> > alexan...@ververica.com
> > >> >
> > >> wrote:
> > >>
> > >> > Hi Becket,
> > >> >
> > >> > thanks for your input. I like the idea of adding the parallelism to
> > the
> > >> > SourceReaderContext. My understanding is that any change of
> > parallelism
> > >> > causes recreation of all readers, so it should be safe to consider
> it
> > >> > "fixed" after the readers' initialization. In that case, it should
> be
> > as
> > >> > simple as adding the following to the anonymous SourceReaderContext
> > >> > implementation
> > >> > in SourceOperator#initReader():
> > >> >
> > >> > public int currentParallelism() {
> > >> >    return getRuntimeContext().getNumberOfParallelSubtasks();
> > >> > }
> > >> >
> > >> > Is that what you had in mind?
> > >> >
> > >> > Best,
> > >> > Alexander Fedulov
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > On Fri, Jul 1, 2022 at 11:30 AM Becket Qin <becket....@gmail.com>
> > >> wrote:
> > >> >
> > >> > > Hi Alex,
> > >> > >
> > >> > > In FLIP-27 source, the SourceReader can get a SourceReaderContext.
> > >> This
> > >> > is
> > >> > > passed in by the TM in Source#createReader(). And supposedly the
> > >> Source
> > >> > > should pass this to the SourceReader if needed.
> > >> > >
> > >> > > In the SourceReaderContext, currently only the index of the
> current
> > >> > subtask
> > >> > > is available, but we can probably add the current parallelism as
> > well.
> > >> > This
> > >> > > would be a change that affects all the Sources, not only for the
> > data
> > >> > > generator source. Perhaps we can have a simple separate FLIP.
> > >> > >
> > >> > > Regarding the semantic of rate limiting, for the rate limit
> source,
> > >> > > personally I feel intuitive to keep the global rate untouched on
> > >> scaling.
> > >> > >
> > >> > > Thanks,
> > >> > >
> > >> > > Jiangjie (Becket) Qin
> > >> > >
> > >> > > On Fri, Jul 1, 2022 at 4:00 AM Alexander Fedulov <
> > >> > alexan...@ververica.com>
> > >> > > wrote:
> > >> > >
> > >> > > > Hi all,
> > >> > > >
> > >> > > > getting back to the idea of reusing FlinkConnectorRateLimiter:
> it
> > is
> > >> > > > designed for the SourceFunction API and has an open() method
> that
> > >> > takes a
> > >> > > > RuntimeContext. Therefore, we need to add a different interface
> > for
> > >> > > > the new Source
> > >> > > > API.
> > >> > > >
> > >> > > > This is where I see a certain limitation for the rate-limiting
> use
> > >> > case:
> > >> > > in
> > >> > > > the old API the individual readers were able to retrieve the
> > current
> > >> > > > parallelism from the RuntimeContext. In the new API, this is not
> > >> > > supported,
> > >> > > > the information about the parallelism is only available in the
> > >> > > > SplitEnumeratorContext to which the readers do not have access.
> > >> > > >
> > >> > > > I see two possibilities:
> > >> > > > 1. Add an optional RateLimiter parameter to the
> > DataGeneratorSource
> > >> > > > constructor. The RateLimiter is then "fixed" and has to be fully
> > >> > > configured
> > >> > > > by the user in the main method.
> > >> > > > 2. Piggy-back on Splits: add parallelism as a field of a Split.
> > The
> > >> > > > initialization of this field would happen dynamically upon
> splits
> > >> > > creation
> > >> > > > in the createEnumerator() method where currentParallelism is
> > >> available.
> > >> > > >
> > >> > > > The second approach makes implementation rather significantly
> more
> > >> > > > complex since we cannot simply wrap
> > >> > NumberSequenceSource.SplitSerializer
> > >> > > in
> > >> > > > that case. The advantage of this approach is that with any kind
> of
> > >> > > > autoscaling, the source rate will match the original
> > configuration.
> > >> But
> > >> > > I'm
> > >> > > > not sure how useful this is. I can even imagine scenarios where
> > >> scaling
> > >> > > the
> > >> > > > input rate together with parallelism would be better for demo
> > >> purposes.
> > >> > > >
> > >> > > > Would be glad to hear your thoughts on this.
> > >> > > >
> > >> > > > Best,
> > >> > > > Alexander Fedulov
> > >> > > >
> > >> > > > On Mon, Jun 20, 2022 at 4:31 PM David Anderson <
> > >> dander...@apache.org>
> > >> > > > wrote:
> > >> > > >
> > >> > > > > I'm very happy with this. +1
> > >> > > > >
> > >> > > > > A lot of SourceFunction implementations used in demos/POC
> > >> > > implementations
> > >> > > > > include a call to sleep(), so adding rate limiting is a good
> > >> idea, in
> > >> > > my
> > >> > > > > opinion.
> > >> > > > >
> > >> > > > > Best,
> > >> > > > > David
> > >> > > > >
> > >> > > > > On Mon, Jun 20, 2022 at 10:10 AM Qingsheng Ren <
> > >> renqs...@gmail.com>
> > >> > > > wrote:
> > >> > > > >
> > >> > > > > > Hi Alexander,
> > >> > > > > >
> > >> > > > > > Thanks for creating this FLIP! I’d like to share some
> > thoughts.
> > >> > > > > >
> > >> > > > > > 1. About the “generatorFunction” I’m expecting an
> initializer
> > >> on it
> > >> > > > > > because it’s hard to require all fields in the generator
> > >> function
> > >> > are
> > >> > > > > > serializable in user’s implementation. Providing a function
> > like
> > >> > > “open”
> > >> > > > > in
> > >> > > > > > the interface could let the function to make some
> > >> initializations
> > >> > in
> > >> > > > the
> > >> > > > > > task initializing stage.
> > >> > > > > >
> > >> > > > > > 2. As of the throttling functinality you mentioned, there’s
> a
> > >> > > > > > FlinkConnectorRateLimiter under flink-core and maybe we
> could
> > >> reuse
> > >> > > > this
> > >> > > > > > interface. Actually I prefer to make rate limiting as a
> common
> > >> > > feature
> > >> > > > > > provided in the Source API, but this requires another FLIP
> > and a
> > >> > lot
> > >> > > of
> > >> > > > > > discussions so I’m OK to have it in the DataGen source
> first.
> > >> > > > > >
> > >> > > > > > Best regards,
> > >> > > > > > Qingsheng
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > > On Jun 17, 2022, at 01:47, Alexander Fedulov <
> > >> > > > alexan...@ververica.com>
> > >> > > > > > wrote:
> > >> > > > > > >
> > >> > > > > > > Hi Jing,
> > >> > > > > > >
> > >> > > > > > > thanks for your thorough analysis. I agree with the points
> > you
> > >> > make
> > >> > > > and
> > >> > > > > > > also with the idea to approach the larger task of
> providing
> > a
> > >> > > > universal
> > >> > > > > > > (DataStream + SQL) data generator base iteratively.
> > >> > > > > > > Regarding the name, the SourceFunction-based
> > >> > *DataGeneratorSource*
> > >> > > > > > resides
> > >> > > > > > > in the
> > >> > *org.apache.flink.streaming.api.functions.source.datagen*. I
> > >> > > > > think
> > >> > > > > > > it is OK to simply place the new one (with the same name)
> > >> next to
> > >> > > the
> > >> > > > > > > *NumberSequenceSource* into
> > >> > > > > *org.apache.flink.api.connector.source.lib*.
> > >> > > > > > >
> > >> > > > > > > One more thing I wanted to discuss:  I noticed that
> > >> > > > *DataGenTableSource
> > >> > > > > > *has
> > >> > > > > > > built-in throttling functionality (*rowsPerSecond*). I
> > >> believe it
> > >> > > is
> > >> > > > > > > something that could be also useful for the DataStream
> users
> > >> of
> > >> > the
> > >> > > > > > > stateless data generator and since we want to eventually
> > >> converge
> > >> > > on
> > >> > > > > the
> > >> > > > > > > same implementation for DataStream and Table/SQL it sounds
> > >> like a
> > >> > > > good
> > >> > > > > > idea
> > >> > > > > > > to add it to the FLIP. What do you think?
> > >> > > > > > >
> > >> > > > > > > Best,
> > >> > > > > > > Alexander Fedulov
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > On Tue, Jun 14, 2022 at 7:17 PM Jing Ge <
> j...@ververica.com
> > >
> > >> > > wrote:
> > >> > > > > > >
> > >> > > > > > >> Hi,
> > >> > > > > > >>
> > >> > > > > > >> After reading all discussions posted in this thread and
> the
> > >> > source
> > >> > > > > code
> > >> > > > > > of
> > >> > > > > > >> DataGeneratorSource which unfortunately used "Source"
> > >> instead of
> > >> > > > > > >> "SourceFunction" in its name, issues could summarized as
> > >> > > following:
> > >> > > > > > >>
> > >> > > > > > >> 1. The current DataGeneratorSource based on
> SourceFunction
> > >> is a
> > >> > > > > stateful
> > >> > > > > > >> source connector and built for Table/SQL.
> > >> > > > > > >> 2. The right name for the new data generator source i.e.
> > >> > > > > > >> DataGeneratorSource has been used for the current
> > >> implementation
> > >> > > > based
> > >> > > > > > on
> > >> > > > > > >> SourceFunction.
> > >> > > > > > >> 3. A new data generator source should be developed based
> on
> > >> the
> > >> > > new
> > >> > > > > > Source
> > >> > > > > > >> API.
> > >> > > > > > >> 4. The new data generator source should be used both for
> > >> > > DataStream
> > >> > > > > and
> > >> > > > > > >> Table/SQL, which means the current DataGeneratorSource
> > >> should be
> > >> > > > > > replaced
> > >> > > > > > >> with the new one.
> > >> > > > > > >> 5. The core event generation logic should be pluggable to
> > >> > support
> > >> > > > > > various
> > >> > > > > > >> (test) scenarios, e.g. rondom stream, changlog stream,
> > >> > > controllable
> > >> > > > > > events
> > >> > > > > > >> per checkpoint, etc.
> > >> > > > > > >>
> > >> > > > > > >> which turns out that
> > >> > > > > > >>
> > >> > > > > > >> To solve 1+3+4 -> we will have to make a big effort to
> > >> replace
> > >> > the
> > >> > > > > > current
> > >> > > > > > >> DataGeneratorSource since the new Source API has a very
> > >> > different
> > >> > > > > > >> concept, especially for the stateful part.
> > >> > > > > > >> To solve 2+3 -> we have to find another name for the new
> > >> > > > > implementation.
> > >> > > > > > >> To solve 1+3+4+5 -> It gets even more complicated to
> > support
> > >> > > > stateless
> > >> > > > > > and
> > >> > > > > > >> stateful scenarios simultaneously with one solution.
> > >> > > > > > >>
> > >> > > > > > >> If we want to solve all of these issues in one shot, It
> > might
> > >> > take
> > >> > > > > > months.
> > >> > > > > > >> Therefore, I would suggest starting from small and
> growing
> > up
> > >> > > > > > iteratively.
> > >> > > > > > >>
> > >> > > > > > >> The proposal for the kickoff is to focus on stateless
> event
> > >> > > > generation
> > >> > > > > > >> with e.g. rondom stream and use the name
> > >> > > > > "StatelessDataGeneratoSource".
> > >> > > > > > >> The will be a period of time that both
> DataGeneratorSource
> > >> will
> > >> > be
> > >> > > > > used
> > >> > > > > > by
> > >> > > > > > >> the developer. The current DataGeneratorSource will be
> then
> > >> > > > > deprecated,
> > >> > > > > > >> once we can(iteratively):
> > >> > > > > > >> 1. either enlarge the scope of
> StatelessDataGeneratoSourcer
> > >> to
> > >> > be
> > >> > > > able
> > >> > > > > > to
> > >> > > > > > >> cover stateful scenarios and renaming it to
> > >> > > > > > "DataGeneratorSourceV2"(follow
> > >> > > > > > >> the naming convention of SinkV2) or
> > >> > > > > > >> 2. develop a new "SatefullDataGeneratorSource" based on
> > >> Source
> > >> > API
> > >> > > > > which
> > >> > > > > > >> can handle the stateful scenarios, if it is impossible to
> > >> > support
> > >> > > > both
> > >> > > > > > >> stateless and stateful scenarios with one GeneratorSource
> > >> > > > > > implementation.
> > >> > > > > > >>
> > >> > > > > > >> Best regards,
> > >> > > > > > >> Jing
> > >> > > > > > >>
> > >> > > > > > >> On Thu, Jun 9, 2022 at 2:48 PM Martijn Visser <
> > >> > > > > martijnvis...@apache.org
> > >> > > > > > >
> > >> > > > > > >> wrote:
> > >> > > > > > >>
> > >> > > > > > >>> Hey Alex,
> > >> > > > > > >>>
> > >> > > > > > >>> Yes, I think we need to make sure that we're not causing
> > >> > > confusion
> > >> > > > (I
> > >> > > > > > know
> > >> > > > > > >>> I already was confused). I think the DataSupplierSource
> is
> > >> > > already
> > >> > > > > > better,
> > >> > > > > > >>> but perhaps there are others who have an even better
> idea.
> > >> > > > > > >>>
> > >> > > > > > >>> Thanks,
> > >> > > > > > >>>
> > >> > > > > > >>> Martijn
> > >> > > > > > >>>
> > >> > > > > > >>> Op do 9 jun. 2022 om 14:28 schreef Alexander Fedulov <
> > >> > > > > > >>> alexan...@ververica.com>:
> > >> > > > > > >>>
> > >> > > > > > >>>> Hi Martijn,
> > >> > > > > > >>>>
> > >> > > > > > >>>> It seems that they serve a bit different purposes
> though.
> > >> The
> > >> > > > > > >>>> DataGenTableSource is for generating random data
> > described
> > >> by
> > >> > > the
> > >> > > > > > Table
> > >> > > > > > >>>> DDL and is tied into the RowDataGenerator/DataGenerator
> > >> > concept
> > >> > > > > which
> > >> > > > > > is
> > >> > > > > > >>>> implemented as an Iterator<T>.  The proposed API in
> > >> contrast
> > >> > is
> > >> > > > > > supposed
> > >> > > > > > >>>> to provide users with an easy way to supply their
> custom
> > >> data.
> > >> > > > > Another
> > >> > > > > > >>>> difference is that a DataGenerator is supposed to be
> > >> stateful
> > >> > > and
> > >> > > > > has
> > >> > > > > > to
> > >> > > > > > >>>> snapshot its state, whereas the proposed API is purely
> > >> driven
> > >> > by
> > >> > > > the
> > >> > > > > > >>> input
> > >> > > > > > >>>> index IDs and can be stateless yet remain
> deterministic.
> > >> Are
> > >> > you
> > >> > > > > sure
> > >> > > > > > it
> > >> > > > > > >>>> is a good idea to mix them into the same API? We could
> > >> think
> > >> > of
> > >> > > > > using
> > >> > > > > > a
> > >> > > > > > >>>> different name to make it less confusing for the users
> > >> > > (something
> > >> > > > > like
> > >> > > > > > >>>> DataSupplierSource).
> > >> > > > > > >>>>
> > >> > > > > > >>>> Best,
> > >> > > > > > >>>> Alexander Fedulov
> > >> > > > > > >>>>
> > >> > > > > > >>>> On Thu, Jun 9, 2022 at 9:14 AM Martijn Visser <
> > >> > > > > > martijnvis...@apache.org
> > >> > > > > > >>>>
> > >> > > > > > >>>> wrote:
> > >> > > > > > >>>>
> > >> > > > > > >>>>> Hi Alex,
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> Thanks for creating the FLIP and opening up the
> > >> discussion.
> > >> > +1
> > >> > > > > > overall
> > >> > > > > > >>> for
> > >> > > > > > >>>>> getting this in place.
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> One question: you've already mentioned that this
> > focussed
> > >> on
> > >> > > the
> > >> > > > > > >>>>> DataStream
> > >> > > > > > >>>>> API. I think it would be a bit confusing that we have
> a
> > >> > Datagen
> > >> > > > > > >>> connector
> > >> > > > > > >>>>> (on the Table side) that wouldn't leverage this target
> > >> > > > interface. I
> > >> > > > > > >>> think
> > >> > > > > > >>>>> it would be good if we could already have one generic
> > >> Datagen
> > >> > > > > > connector
> > >> > > > > > >>>>> which works for both DataStream API (so that would be
> a
> > >> new
> > >> > one
> > >> > > > in
> > >> > > > > > the
> > >> > > > > > >>>>> Flink repo) and that the Datagen in the Table
> landscape
> > is
> > >> > > using
> > >> > > > > this
> > >> > > > > > >>>>> target interface too. What do you think?
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> Best regards,
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> Martijn
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> Op wo 8 jun. 2022 om 14:21 schreef Alexander Fedulov <
> > >> > > > > > >>>>> alexan...@ververica.com>:
> > >> > > > > > >>>>>
> > >> > > > > > >>>>>> Hi Xianxun,
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>> Thanks for bringing it up. I do believe it would be
> > >> useful
> > >> > to
> > >> > > > have
> > >> > > > > > >>> such
> > >> > > > > > >>>>> a
> > >> > > > > > >>>>>> CDC data generator but I see the
> > >> > > > > > >>>>>> efforts to provide one a bit orthogonal to the
> > >> > > > DataSourceGenerator
> > >> > > > > > >>>>> proposed
> > >> > > > > > >>>>>> in the FLIP. FLIP-238 focuses
> > >> > > > > > >>>>>> on the DataStream API and I could see integration
> into
> > >> the
> > >> > > > > Table/SQL
> > >> > > > > > >>>>>> ecosystem as the next step that I would
> > >> > > > > > >>>>>> prefer to keep separate (see KafkaDynamicSource
> reusing
> > >> > > > > > >>>>>> KafkaSource<RowData>
> > >> > > > > > >>>>>> under the hood [1]).
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>> [1]
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>
> > >> > > > > > >>>
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/apache/flink/blob/3adca15859b36c61116fc56fabc24e9647f0ec5f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java#L223
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>> Best,
> > >> > > > > > >>>>>> Alexander Fedulov
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>> On Wed, Jun 8, 2022 at 11:01 AM Xianxun Ye <
> > >> > yxx_c...@163.com>
> > >> > > > > > wrote:
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>>> Hey Alexander,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Making datagen source connector easier to use is
> > really
> > >> > > helpful
> > >> > > > > > >>> during
> > >> > > > > > >>>>>>> doing some PoC/Demo.
> > >> > > > > > >>>>>>> And I thought about is it possible to produce a
> > >> changelog
> > >> > > > stream
> > >> > > > > by
> > >> > > > > > >>>>>>> datagen source, so a new flink developer can
> practice
> > >> flink
> > >> > > sql
> > >> > > > > > >>> with
> > >> > > > > > >>>>> cdc
> > >> > > > > > >>>>>>> data using Flink SQL Client CLI.
> > >> > > > > > >>>>>>> In the flink-examples-table module, a
> > >> > ChangelogSocketExample
> > >> > > > > > >>> class[1]
> > >> > > > > > >>>>>>> describes how to ingest delete or insert data by
> 'nc'
> > >> > > command.
> > >> > > > > Can
> > >> > > > > > >>> we
> > >> > > > > > >>>>>>> support producing a changelog stream by the new
> > datagen
> > >> > > source?
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> [1]
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>
> > >> > > > > > >>>
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java#L79
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Best regards,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Xianxun
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> On 06/8/2022 08:10,Alexander Fedulov<
> > >> > alexan...@ververica.com
> > >> > > >
> > >> > > > > > >>>>>>> <alexan...@ververica.com> wrote:
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> I looked a bit further and it seems it should
> actually
> > >> be
> > >> > > > easier
> > >> > > > > > >>> than
> > >> > > > > > >>>>> I
> > >> > > > > > >>>>>>> initially thought:  SourceReader extends
> > >> CheckpointListener
> > >> > > > > > >>> interface
> > >> > > > > > >>>>> and
> > >> > > > > > >>>>>>> with its custom implementation it should be possible
> > to
> > >> > > achieve
> > >> > > > > > >>>>> similar
> > >> > > > > > >>>>>>> results. A prototype that I have for the generator
> > uses
> > >> an
> > >> > > > > > >>>>>>> IteratorSourceReader
> > >> > > > > > >>>>>>> under the hood by default but we could consider
> adding
> > >> the
> > >> > > > > ability
> > >> > > > > > >>> to
> > >> > > > > > >>>>>>> supply something like a
> > DataGeneratorSourceReaderFactory
> > >> > that
> > >> > > > > would
> > >> > > > > > >>>>> allow
> > >> > > > > > >>>>>>> provisioning the DataGeneratorSource with customized
> > >> > > > > > >>> implementations
> > >> > > > > > >>>>> for
> > >> > > > > > >>>>>>> cases like this.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Best,
> > >> > > > > > >>>>>>> Alexander Fedulov
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> On Wed, Jun 8, 2022 at 12:58 AM Alexander Fedulov <
> > >> > > > > > >>>>>> alexan...@ververica.com
> > >> > > > > > >>>>>>>>
> > >> > > > > > >>>>>>> wrote:
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Hi Steven,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> This is going to be tricky since in the new Source
> API
> > >> the
> > >> > > > > > >>>>> checkpointing
> > >> > > > > > >>>>>>> aspects that you based your logic on are pushed
> > further
> > >> > away
> > >> > > > from
> > >> > > > > > >>> the
> > >> > > > > > >>>>>>> low-level interfaces responsible for handling data
> and
> > >> > splits
> > >> > > > > [1].
> > >> > > > > > >>> At
> > >> > > > > > >>>>> the
> > >> > > > > > >>>>>>> same time, the SourceCoordinatorProvider is
> hardwired
> > >> into
> > >> > > the
> > >> > > > > > >>>>> internals
> > >> > > > > > >>>>>>> of the framework, so I don't think it will be
> possible
> > >> to
> > >> > > > > provide a
> > >> > > > > > >>>>>>> customized implementation for testing purposes.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> The only chance to tie data generation to
> > checkpointing
> > >> in
> > >> > > the
> > >> > > > > new
> > >> > > > > > >>>>> Source
> > >> > > > > > >>>>>>> API that I see at the moment is via the
> > SplitEnumerator
> > >> > > > > serializer
> > >> > > > > > >>> (
> > >> > > > > > >>>>>>> getEnumeratorCheckpointSerializer() method) [2]. In
> > >> theory,
> > >> > > it
> > >> > > > > > >>> should
> > >> > > > > > >>>>> be
> > >> > > > > > >>>>>>> possible to share a variable visible both to the
> > >> generator
> > >> > > > > function
> > >> > > > > > >>>>> and
> > >> > > > > > >>>>>> to
> > >> > > > > > >>>>>>> the serializer and manipulate it whenever the
> > >> serialize()
> > >> > > > method
> > >> > > > > > >>> gets
> > >> > > > > > >>>>>>> called upon a checkpoint request. That said, you
> still
> > >> > won't
> > >> > > > get
> > >> > > > > > >>>>>>> notifications of successful checkpoints that you
> > >> currently
> > >> > > use
> > >> > > > > > >>> (this
> > >> > > > > > >>>>> info
> > >> > > > > > >>>>>>> is only available to the SourceCoordinator).
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> In general, regardless of the generator
> implementation
> > >> > > itself,
> > >> > > > > the
> > >> > > > > > >>> new
> > >> > > > > > >>>>>>> Source
> > >> > > > > > >>>>>>> API does not seem to support the use case of
> verifying
> > >> > > > > checkpoints
> > >> > > > > > >>>>>>> contents in lockstep with produced data, at least I
> do
> > >> not
> > >> > > see
> > >> > > > an
> > >> > > > > > >>>>>> immediate
> > >> > > > > > >>>>>>> solution for this. Can you think of a different way
> of
> > >> > > checking
> > >> > > > > the
> > >> > > > > > >>>>>>> correctness of the Iceberg Sink implementation that
> > does
> > >> > not
> > >> > > > rely
> > >> > > > > > >>> on
> > >> > > > > > >>>>> this
> > >> > > > > > >>>>>>> approach?
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Best,
> > >> > > > > > >>>>>>> Alexander Fedulov
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> [1]
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>
> > >> > > > > > >>>
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/apache/flink/blob/0f19c2472c54aac97e4067f5398731ab90036d1a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L337
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> [2]
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>
> > >> > > > > > >>>
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/apache/flink/blob/e4b000818c15b5b781c4e5262ba83bfc9d65121a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L97
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> On Tue, Jun 7, 2022 at 6:03 PM Steven Wu <
> > >> > > stevenz...@gmail.com
> > >> > > > >
> > >> > > > > > >>>>> wrote:
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> In Iceberg source, we have a data generator source
> > that
> > >> can
> > >> > > > > control
> > >> > > > > > >>>>> the
> > >> > > > > > >>>>>>> records per checkpoint cycle. Can we support sth
> like
> > >> this
> > >> > in
> > >> > > > the
> > >> > > > > > >>>>>>> DataGeneratorSource?
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>
> > >> > > > > > >>>
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/apache/iceberg/blob/master/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java
> > >> > > > > > >>>>>>> public BoundedTestSource(List<List<T>>
> > >> > elementsPerCheckpoint,
> > >> > > > > > >>> boolean
> > >> > > > > > >>>>>>> checkpointEnabled)
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Thanks,
> > >> > > > > > >>>>>>> Steven
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> On Tue, Jun 7, 2022 at 8:48 AM Alexander Fedulov <
> > >> > > > > > >>>>>> alexan...@ververica.com
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> wrote:
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Hi everyone,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> I would like to open a discussion on FLIP-238:
> > Introduce
> > >> > > > > > >>> FLIP-27-based
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Data
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Generator Source [1]. During the discussion about
> > >> > deprecating
> > >> > > > the
> > >> > > > > > >>>>>>> SourceFunction API [2] it became evident that an
> > >> > easy-to-use
> > >> > > > > > >>>>>>> FLIP-27-compatible data generator source is needed
> so
> > >> that
> > >> > > the
> > >> > > > > > >>> current
> > >> > > > > > >>>>>>> SourceFunction-based data generator implementations
> > >> could
> > >> > be
> > >> > > > > phased
> > >> > > > > > >>>>> out
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> for
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> both Flink demo/PoC applications and for the
> internal
> > >> Flink
> > >> > > > > tests.
> > >> > > > > > >>>>> This
> > >> > > > > > >>>>>>> FLIP proposes to introduce a generic
> > DataGeneratorSource
> > >> > > > capable
> > >> > > > > of
> > >> > > > > > >>>>>>> producing events of an arbitrary type based on a
> > >> > > user-supplied
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> MapFunction.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Looking forward to your feedback.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> [1] https://cwiki.apache.org/confluence/x/9Av1D
> > >> > > > > > >>>>>>> [2]
> > >> > > > > > >>>
> > >> > https://lists.apache.org/thread/d6cwqw9b3105wcpdkwq7rr4s7x4ywqr9
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Best,
> > >> > > > > > >>>>>>> Alexander Fedulov
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>
> > >> > > > > > >>>>
> > >> > > > > > >>>
> > >> > > > > > >>
> > >> > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> >
>

Reply via email to