I had to do something similar recently for FLIP-238 (generator source) [1].
The PoC [2] reuses the NumberSequenceSource to produce other data types
based on a user-defined mapper. The mapping happens within the
SourceReader. Here are some relevant classes [3], [4]. Not sure if this is
the best approach for your case, but it could hint at one possible
direction.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-238%3A+Introduce+FLIP-27-based+Data+Generator+Source
[2] https://github.com/afedulov/flink/tree/FLINK-27919-generator-source
[3]
https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/MappingIteratorSourceReader.java
[4]
https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceV3.java#L135

Best,
Alexander Fedulov

On Mon, Jul 18, 2022 at 8:01 PM Salva Alcántara <salcantara...@gmail.com>
wrote:

> Yep, that is mostly it. I have (DataStream) connector (sources & sink)
> which works for a fixed type (`JsonNode` for what it's worth) as you say
> and I want to reuse it for Table/SQL, which requires working with `DataRow`
> as the underlying data type. But even beyond that specific use case, I
> think being able of genering sinks/sources out of existing ones makes
> sense. Essentially, what I'd like is to make sources & sinks functorial by
> attaching a `map` method, but maybe I should not even need this in the
> first place and there are more idiomatic/correct ways of approaching this
> within Flink.
>
> On Mon, Jul 18, 2022 at 5:33 PM Alexander Fedulov <alexan...@ververica.com>
> wrote:
>
>> Hi Salva,
>>
>> what is the goal? Do you have some source that already has a fixed type
>> and you want to reuse its functionality for producing a different data type?
>>
>> Best,
>> Alexander Fedulov
>>
>>
>> On Mon, Jul 18, 2022 at 1:29 PM Salva Alcántara <salcantara...@gmail.com>
>> wrote:
>>
>>> If I have a Source<A> (Sink<A>), what would be the simplest way of
>>> obtaining a Source<B> (Sink<B>) based on a mapping/conversion function from
>>> A to B. AFAIK sources & sinks don't have map so I was just wondering how to
>>> approach this in the context of new sources/sinks apis.
>>>
>>> Regards,
>>>
>>> Salva
>>>
>>

Reply via email to