I had to do something similar recently for FLIP-238 (generator source) [1]. The PoC [2] reuses the NumberSequenceSource to produce other data types based on a user-defined mapper. The mapping happens within the SourceReader. Here are some relevant classes [3], [4]. Not sure if this is the best approach for your case, but it could hint at one possible direction.
[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-238%3A+Introduce+FLIP-27-based+Data+Generator+Source [2] https://github.com/afedulov/flink/tree/FLINK-27919-generator-source [3] https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/MappingIteratorSourceReader.java [4] https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceV3.java#L135 Best, Alexander Fedulov On Mon, Jul 18, 2022 at 8:01 PM Salva Alcántara <salcantara...@gmail.com> wrote: > Yep, that is mostly it. I have (DataStream) connector (sources & sink) > which works for a fixed type (`JsonNode` for what it's worth) as you say > and I want to reuse it for Table/SQL, which requires working with `DataRow` > as the underlying data type. But even beyond that specific use case, I > think being able of genering sinks/sources out of existing ones makes > sense. Essentially, what I'd like is to make sources & sinks functorial by > attaching a `map` method, but maybe I should not even need this in the > first place and there are more idiomatic/correct ways of approaching this > within Flink. > > On Mon, Jul 18, 2022 at 5:33 PM Alexander Fedulov <alexan...@ververica.com> > wrote: > >> Hi Salva, >> >> what is the goal? Do you have some source that already has a fixed type >> and you want to reuse its functionality for producing a different data type? >> >> Best, >> Alexander Fedulov >> >> >> On Mon, Jul 18, 2022 at 1:29 PM Salva Alcántara <salcantara...@gmail.com> >> wrote: >> >>> If I have a Source<A> (Sink<A>), what would be the simplest way of >>> obtaining a Source<B> (Sink<B>) based on a mapping/conversion function from >>> A to B. AFAIK sources & sinks don't have map so I was just wondering how to >>> approach this in the context of new sources/sinks apis. >>> >>> Regards, >>> >>> Salva >>> >>