Thanks Fabian for replying. But I am using KafkaSource only.

The code is something like below.

class MetricSource {
      final Set<String> metricSdms = new HashSet();
      ...
       env.addSource(MetricKafkaSourceFactory.createConsumer(jobParams))

        .name(MetricSource.class.getSimpleName())
        .uid(MetricSource.class.getSimpleName())
        .filter(sdm -> metricSdms.contains(sdm.getType()));

}

class MetricKafkaSourceFactory {
      public static FlinkKafkaConsumer<SelfDescribingMessageDO>
createConsumer(final Configuration jobParams) {
           ...
       return new FlinkKafkaConsumer<>(topic, new DeserializationSchema(),
props);
   }
}


On Wed, Jun 23, 2021 at 7:31 PM Fabian Paul <fabianp...@data-artisans.com>
wrote:

> Hi Debraj,
>
> By Source Legacy Thread we refer to all sources which do not implement the
> new interface yet [1]. Currently only the Hive, Kafka and FileSource
> are already migrated. In general, there is no sever downside of using the
> older source but in the future we plan only to implement ones based on
> the new operator model.
>
> Best,
> Fabian
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>
>

Reply via email to