Use Flink to process request with list of queries and aggregate

2021-01-08 Thread Li Wang
Hi Team, I have a Flink application reading from Kafka. Each payload is a request sent by a user containing a list of queries. What I would like to do is use Flink to process the queries parallelly and aggregate results and send back to the user. For example, let's say we have two messages in Kaf

Normalizing Metric Identifies for Reporting Systems

2021-01-08 Thread Joseph Lorenzini
Hi all,   I am implementing a metric reporter for newrelic. I’d like it to support a job’s operator metrics that come with the flink framework out of the box. In order to ensure each metric is unique you can’t use the metric name, you need to use the metric identifier. However, I am not

Re: Normalizing Metric Identifies for Reporting Systems

2021-01-08 Thread Chesnay Schepler
One thing you could do is take the first N characters and hash the remaining ones; I don't think there is a better solution at the moment. The size of job/task/operator names is a rather fundamental issue that makes a lot of things complicated (metrics, logging, UI), but we haven't made any prog

RE: Flink to get historical data from kafka between timespan t1 & t2

2021-01-08 Thread VINAY.RAICHUR
Thanks Aljoscha for your prompt response. It means a lot to me 😊 Could you also attach the code snippet for KafkaSource`, `KafkaSourceBuilder`, and `OffsetInitializers` that you were referring to in your previous reply, for my reference please to make it more clearer for me. Kind regards, Vinay

Re: Flink to get historical data from kafka between timespan t1 & t2

2021-01-08 Thread Aljoscha Krettek
Hi, for your point 3. you can look at `FlinkKafkaConsumerBase.setStartFromTimestamp(...)`. Points 1. and 2. will not work with the well established `FlinkKafkaConsumer`. However, it should be possible to do it with the new `KafkaSource` that was introduced in Flink 1.12. It might be a bit r

Re: Flink taskmanager id

2021-01-08 Thread Till Rohrmann
Hi Omkar, Since version 1.12.0 you can configure the TaskManager's resource id via `taskmanager.resource-id` [1]. Moreover, if not set, then it defaults to rpcAddress:rpcPort and a 6 digit random suffix. [1] https://issues.apache.org/jira/browse/FLINK-17579 Cheers, Till On Thu, Jan 7, 2021 at 1

Re: How should I process a cumulative counter?

2021-01-08 Thread Aljoscha Krettek
Hi Larry, the basic problem for your use case is that window boundaries are inclusive for the start timestamp and exclusive for the end timestamp. It's setup like this to ensure that consecutive tumbling windows don't overlap. This is only a function of how our `WindowAssigner` works, so it

Re: Flink app logs to Elastic Search

2021-01-08 Thread Aljoscha Krettek
So you're saying there is no logging output whatsoever being sent to Elasticsearch? Did you try and see if the jar file is being picked up? Are you still getting the pre-defined, text-based logging output? Best, Aljoscha On 2021/01/07 17:04, bat man wrote: Hi Team, I have a requirement to p

Re: Question about "NoWatermark" in Flink 1.9.2

2021-01-08 Thread Aljoscha Krettek
Thanks for the update! Best, Aljoscha On 2021/01/07 16:45, Peter Huang wrote: Hi, We end up finding the root cause. Since a time point, two of the partitions of the input topic don't have any data which causes the second window operator in the pipeline can't receive the watermark of all of the

Re: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

2021-01-08 Thread Arvid Heise
Hi Dongwon, inferring the type information of Java classes is quite messy. At first, it seems like that should work out the box as you are only using as the type of the list, right? However, there is no way of knowing if you didn't use a subclass of A. Of course, if A was final, it might be possi