Re: "Legacy Source Thread" line in logs
Thanks Fabian again for the clarification. On Thu, Jun 24, 2021 at 8:16 PM Fabian Paul wrote: > Hi Debraj, > > Sorry for the confusion the FlinkKafkaConsumer is the old source and the > overhauled one you can find here [1]. > You would need to replace the FlinkKafkaConsumer with the KafkaSource to > not see the message anymore. > > Best > Fabian > > > [1] > https://github.com/apache/flink/blob/2bd8fab01d2aba99a5f690d051e817d10d2c6f24/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java#L75 > >
Re: "Legacy Source Thread" line in logs
Hi Debraj, Sorry for the confusion the FlinkKafkaConsumer is the old source and the overhauled one you can find here [1]. You would need to replace the FlinkKafkaConsumer with the KafkaSource to not see the message anymore. Best Fabian [1] https://github.com/apache/flink/blob/2bd8fab01d2aba99a5f690d051e817d10d2c6f24/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java#L75
Re: "Legacy Source Thread" line in logs
Thanks Fabian for replying. But I am using KafkaSource only. The code is something like below. class MetricSource { final Set metricSdms = new HashSet(); ... env.addSource(MetricKafkaSourceFactory.createConsumer(jobParams)) .name(MetricSource.class.getSimpleName()) .uid(MetricSource.class.getSimpleName()) .filter(sdm -> metricSdms.contains(sdm.getType())); } class MetricKafkaSourceFactory { public static FlinkKafkaConsumer createConsumer(final Configuration jobParams) { ... return new FlinkKafkaConsumer<>(topic, new DeserializationSchema(), props); } } On Wed, Jun 23, 2021 at 7:31 PM Fabian Paul wrote: > Hi Debraj, > > By Source Legacy Thread we refer to all sources which do not implement the > new interface yet [1]. Currently only the Hive, Kafka and FileSource > are already migrated. In general, there is no sever downside of using the > older source but in the future we plan only to implement ones based on > the new operator model. > > Best, > Fabian > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > >
Re: "Legacy Source Thread" line in logs
Hi Debraj, By Source Legacy Thread we refer to all sources which do not implement the new interface yet [1]. Currently only the Hive, Kafka and FileSource are already migrated. In general, there is no sever downside of using the older source but in the future we plan only to implement ones based on the new operator model. Best, Fabian [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
"Legacy Source Thread" line in logs
Hi I am seeing the below logs in flink 1.13.0 running in YARN 2021-06-23T13:41:45.761Z WARN grid.task.MetricSdmStalenessUtils Legacy Source Thread - Source: MetricSource -> Filter -> MetricStoreMapper -> (Filter -> Timestamps/Watermarks -> Map -> Flat Map, Sink: FlinkKafkaProducer11, Sink: TSDBSink14) (1/1)#0 updateMetricStalenessInHisto:32 Received a non-positive staleness = -194239 at 1624455705761 Can someone let me know what does the "Legacy Source Thread" denotes? I saw the same question here <http://deprecated-apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/quot-Legacy-Source-Thread-quot-line-in-logs-td33941.html#a44594> in the deprecated mailing list with no answer. So starting a new email thread here.
Re: "Legacy Source Thread" line in logs
Hi KristoffSC, the short answer is: you have probably differently configured logger. They log in a different format or level. The longer answer: all source connectors currently use the legacy source thread. That will only change with FLIP-27 [1] being widely adapted. It was originally planned to come sooner, that's why the name of the source thread contains "legacy". Even a bit further into the details: in Flink <1.9, each task used several threads for doing things. With 1.9, all tasks now use a single thread with a mailbox model (kind of like an java.util.concurrent.Executor). However, one type of tasks couldn't be refactored: source tasks. They had to stick with the old model, because the source interfaces assume that each source connector spawns his own thread and pushes it's messages. The new interfaces with FLIP-27 will be pull-based, so that we can also use the mailbox model for that. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface On Fri, Mar 27, 2020 at 12:40 PM KristoffSC wrote: > Hi all, > When I run Flink from IDE i can see this prefix in logs > "Legacy Source Thread" > > Running the same job as JobCluster on docker, this prefix is not present. > What this prefix means? > Btw, I'm using [1] as ActiveMQ connector. > > Thanks. > > [1] > https://github.com/apache/bahir-flink/tree/master/flink-connector-activemq > > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >
"Legacy Source Thread" line in logs
Hi all, When I run Flink from IDE i can see this prefix in logs "Legacy Source Thread" Running the same job as JobCluster on docker, this prefix is not present. What this prefix means? Btw, I'm using [1] as ActiveMQ connector. Thanks. [1] https://github.com/apache/bahir-flink/tree/master/flink-connector-activemq -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/