Re: "Legacy Source Thread" line in logs

2021-06-24 Thread Debraj Manna
Thanks Fabian again for the clarification.

On Thu, Jun 24, 2021 at 8:16 PM Fabian Paul 
wrote:

> Hi Debraj,
>
> Sorry for the confusion the FlinkKafkaConsumer is the old source and the
> overhauled one you can find here [1].
> You would need to replace the FlinkKafkaConsumer with the KafkaSource to
> not see the message anymore.
>
> Best
> Fabian
>
>
> [1]
> https://github.com/apache/flink/blob/2bd8fab01d2aba99a5f690d051e817d10d2c6f24/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java#L75
>
>


Re: "Legacy Source Thread" line in logs

2021-06-24 Thread Fabian Paul
Hi Debraj,

Sorry for the confusion the FlinkKafkaConsumer is the old source and the 
overhauled one you can find here [1].
You would need to replace the FlinkKafkaConsumer with the KafkaSource to not 
see the message anymore.

Best
Fabian


[1] 
https://github.com/apache/flink/blob/2bd8fab01d2aba99a5f690d051e817d10d2c6f24/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java#L75



Re: "Legacy Source Thread" line in logs

2021-06-24 Thread Debraj Manna
Thanks Fabian for replying. But I am using KafkaSource only.

The code is something like below.

class MetricSource {
  final Set metricSdms = new HashSet();
  ...
   env.addSource(MetricKafkaSourceFactory.createConsumer(jobParams))

.name(MetricSource.class.getSimpleName())
.uid(MetricSource.class.getSimpleName())
.filter(sdm -> metricSdms.contains(sdm.getType()));

}

class MetricKafkaSourceFactory {
  public static FlinkKafkaConsumer
createConsumer(final Configuration jobParams) {
   ...
   return new FlinkKafkaConsumer<>(topic, new DeserializationSchema(),
props);
   }
}


On Wed, Jun 23, 2021 at 7:31 PM Fabian Paul 
wrote:

> Hi Debraj,
>
> By Source Legacy Thread we refer to all sources which do not implement the
> new interface yet [1]. Currently only the Hive, Kafka and FileSource
> are already migrated. In general, there is no sever downside of using the
> older source but in the future we plan only to implement ones based on
> the new operator model.
>
> Best,
> Fabian
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>
>


Re: "Legacy Source Thread" line in logs

2021-06-23 Thread Fabian Paul
Hi Debraj,

By Source Legacy Thread we refer to all sources which do not implement the new 
interface yet [1]. Currently only the Hive, Kafka and FileSource
are already migrated. In general, there is no sever downside of using the older 
source but in the future we plan only to implement ones based on 
the new operator model.

Best,
Fabian

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface



"Legacy Source Thread" line in logs

2021-06-23 Thread Debraj Manna
Hi

I am seeing the below logs in flink 1.13.0 running in YARN

2021-06-23T13:41:45.761Z WARN grid.task.MetricSdmStalenessUtils Legacy
Source Thread - Source: MetricSource -> Filter -> MetricStoreMapper ->
(Filter -> Timestamps/Watermarks -> Map -> Flat Map, Sink:
FlinkKafkaProducer11, Sink: TSDBSink14) (1/1)#0
updateMetricStalenessInHisto:32 Received a non-positive staleness = -194239
at 1624455705761

Can someone let me know what does the "Legacy Source Thread" denotes?

I saw the same question here
<http://deprecated-apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/quot-Legacy-Source-Thread-quot-line-in-logs-td33941.html#a44594>
in the deprecated mailing list with no answer. So starting a new email
thread here.


Re: "Legacy Source Thread" line in logs

2020-03-27 Thread Arvid Heise
Hi KristoffSC,

the short answer is: you have probably differently configured logger. They
log in a different format or level.

The longer answer: all source connectors currently use the legacy source
thread. That will only change with FLIP-27 [1] being widely adapted. It was
originally planned to come sooner, that's why the name of the source thread
contains "legacy".
Even a bit further into the details: in Flink <1.9, each task used several
threads for doing things. With 1.9, all tasks now use a single thread with
a mailbox model (kind of like an java.util.concurrent.Executor). However,
one type of tasks couldn't be refactored: source tasks. They had to stick
with the old model, because the source interfaces assume that each source
connector spawns his own thread and pushes it's messages. The new
interfaces with FLIP-27 will be pull-based, so that we can also use the
mailbox model for that.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface

On Fri, Mar 27, 2020 at 12:40 PM KristoffSC 
wrote:

> Hi all,
> When I run Flink from IDE i can see this prefix in logs
> "Legacy Source Thread"
>
> Running the same job as JobCluster on docker, this prefix is not present.
> What this prefix means?
> Btw, I'm using [1] as ActiveMQ connector.
>
> Thanks.
>
> [1]
> https://github.com/apache/bahir-flink/tree/master/flink-connector-activemq
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


"Legacy Source Thread" line in logs

2020-03-27 Thread KristoffSC
Hi all,
When I run Flink from IDE i can see this prefix in logs
"Legacy Source Thread"

Running the same job as JobCluster on docker, this prefix is not present.
What this prefix means?
Btw, I'm using [1] as ActiveMQ connector.

Thanks.

[1]
https://github.com/apache/bahir-flink/tree/master/flink-connector-activemq





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/