Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-23 Thread David Morávek
Hi Rion, you just need to call *sink.setRuntimeContext(getRuntimeContext())* before opening the child sink. Please see *AbstractRichFunction* [1] (that EleasticsearchSink extends) for more details. One more note, instead of starting with integration test, I'd recommend writing a unit test using *

Re: Creating a generic ARRAY_AGG aggregate function for Flink SQL

2021-08-23 Thread Ingo Bürk
Hi, just FYI, we do already have issues in JIRA for this: * https://issues.apache.org/jira/browse/FLINK-21949 * https://issues.apache.org/jira/browse/FLINK-22484 Best Ingo On Tue, Aug 24, 2021 at 8:23 AM Caizhi Weng wrote: > Hi! > > As far as I know, returning an array from the getValue metho

Re: Creating a generic ARRAY_AGG aggregate function for Flink SQL

2021-08-23 Thread Caizhi Weng
Hi! As far as I know, returning an array from the getValue method containing external data format is OK. Flink will do the conversion for you. Are you faced with any exception when using this array_agg? If yes what's the exception stack? You can also open a JIRA ticket to require a built-in supp

Re: map concurrent modification exception analysis when checkpoint

2021-08-23 Thread yidan zhao
The issue has been resolved, as I said in the previous email. It is caused by the async function, every record processed by the async function will be a state in the async operator, which is a map type(UserAccessLog). Arvid Heise 于2021年8月23日周一 下午11:26写道: > I don't see anything suspicious in your

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-23 Thread Rion Williams
Hi David, Thanks again for the response, I believe that I'm getting pretty close for at least a POC-level implementation of this. Currently, I'm working with JsonObject instances throughout the pipeline, so I wanted to try this out and simply stored the routing information within the element itsel

Re: Task Managers having trouble registering after restart

2021-08-23 Thread Kevin Lam
Actually, we are using the `FlinkKafkaConsumer` [0] rather than `KafkaSource`. Is there a way to disable the consumer metrics using `FlinkKafkaConsumer`? Do you expect that to have the same Metaspace issue? [0] https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink

Re: Kafka Metrics

2021-08-23 Thread Arvid Heise
Hi Mason, We always appreciate contributions but I think that this metric reporter would need to be beefed up quite a bit to be of general use. In this case, it works because there is an existing relatively narrow group to deny, but there are plenty of other cases where this isn't the case (like m

Re: Task Managers having trouble registering after restart

2021-08-23 Thread Kevin Lam
Thanks Arvid! I will give this a try and report back. On Mon, Aug 23, 2021 at 11:07 AM Arvid Heise wrote: > Hi Kevin, > > "java.lang.OutOfMemoryError: Metaspace" indicates that too many classes > have been loaded. [1] > If you only see that after a while, it's indicating that there is a > classl

Re: Sending the partition request to 'null' failed on Kubernetes cluster, session mode

2021-08-23 Thread Zbyszko Papierski
Yes, it turned out that we only configured TM<->JM communication correctly - inter TM config was missing, hence being "reject all". Thanks for the suggestion! On Mon, Aug 23, 2021 at 5:29 PM Arvid Heise wrote: > It rather looks to me as if the task manager can not communicate with each > other.

Re: Kafka Metrics

2021-08-23 Thread Mason Chen
Sweet, I suspected it but I thought I might ask anyway. Consequently, I've implemented a deny list feature for my reporter (based on groupNameKey and metricName). The reporter will skip reporting metrics if a metric's variables set contains keys that map to the groupNameKey and if the metric has a

Re: DataStream to Table API

2021-08-23 Thread Matthias Broecheler
Perfect, that worked. Thanks a lot, JING! On Sun, Aug 22, 2021 at 1:25 AM JING ZHANG wrote: > Hi Matthias, > Before the bug is fixed, you could specify the return type explicitly in > the second parameter of the map function. > > DataStream rows = integers.map(i -> Row.of("Name"+i, i)); -> >

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-23 Thread David Morávek
Hi Rion, Sorry for late reply, I've missed your previous message. Thanks Arvid for the reminder <3. something like a MessageWrapper and pass those > elements to the sink, which would create the tenant-specific Elastic > connection from the ConfigurationT element and handle caching it and then > j

Can't start FlinkKafkaProducer using SSL

2021-08-23 Thread Wouter Zorgdrager
Hi all, I'm trying to deploy a FlinkKafkaProducer in PyFlink on a remote cluster. Unfortunately, I'm getting the following exception: Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common. KafkaException: Failed to construct kafka producer at org.apache.flink.kafka.shaded.org.apach

Flink 1.13.1 Kafka Producer Error

2021-08-23 Thread Debraj Manna
I am trying to use flink kafka producer like below public static FlinkKafkaProducer createProducer() { Properties props = new Properties(); props.setProperty("bootstrap.servers", ""); props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.ge

Re: Flink's Kafka Offset Management

2021-08-23 Thread Arvid Heise
It's basically as you have said. If you resume from a checkpoint/savepoint (./bin flink run -s ), Flink will always use the offset that has been stored inside it. If you don't resume from a checkpoint, it depends on how you have configured the consumer. If you have supplied a group.id and left the

Re: Keystore format limitations for TLS

2021-08-23 Thread Alexander Fedulov
Hi Alexis, the first step would be to verify whether the keystore that you are trying to use is compatible with the Java version inside of your Docker container ( even before involving any of Flink specifics). Try the following: - Run your Flink Docker container locally - Mount a folder wit

Re: Sending the partition request to 'null' failed on Kubernetes cluster, session mode

2021-08-23 Thread Arvid Heise
It rather looks to me as if the task manager can not communicate with each other. Can you check your network policies? Are they allowed to communicate on random ports? On Mon, Aug 23, 2021 at 8:37 AM Zbyszko Papierski wrote: > Hi, > > No, they don't - only the job is being restarted after that,

Re: map concurrent modification exception analysis when checkpoint

2021-08-23 Thread Arvid Heise
I don't see anything suspicious in your code. The stacktrace is also for a MapSerializer. Do you have another operator where you put Map into a custom state? On Fri, Aug 20, 2021 at 6:43 PM yidan zhao wrote: > But, I do not know why this leads to the job's failure and recovery > since I have set

Re: Kafka Metrics

2021-08-23 Thread Arvid Heise
Hi Mason, I'm afraid it's an all-or-nothing. Either you get the proxied metrics with all partitions or none. You could also implement a custom MetricReporter that delegates to your actual reporter and filters the respective metrics. Best, Arvid On Fri, Aug 20, 2021 at 8:16 AM Mason Chen wrote

Re: Looking for suggestions about multithreaded CEP to be used with flink

2021-08-23 Thread Arvid Heise
I'm afraid that Flink CEP is built rather for lots of data on a couple of rules. Using any kind of other library would push down the state management and data distribution onto you and negate the main idea of using CEP in the first place. The question is if some of the patterns could be aggregate

Re: Task Managers having trouble registering after restart

2021-08-23 Thread Arvid Heise
Hi Kevin, "java.lang.OutOfMemoryError: Metaspace" indicates that too many classes have been loaded. [1] If you only see that after a while, it's indicating that there is a classloader leak. I suspect that this is because of Kafka metrics. There have been some reports in the past. You can try to se

Re: Job manager sometimes doesn't restore job from checkpoint post TaskManager failure

2021-08-23 Thread Kevin Lam
Hi, I was able to understand what was causing this. We were using the restart strategy `fixed-delay` with a maximum number of restarts set to 10. Using exponential-delay resolved the issue of restarting the job from fresh. On Thu, Aug 19, 2021 at 2:04 PM Chesnay Schepler wrote: > How do you dep

Re: Apache Flink matrics are not alligned in the reporter

2021-08-23 Thread Jawad Tahir
Hi Chesnay, Thank you for your message. My reporter interval is set to 1 second. On Thu, 19 Aug 2021 at 19:51, Chesnay Schepler wrote: > What reporter interval do you have configured? > > On 19/08/2021 13:31, Jawad Tahir wrote: > > Hi, > > > > I have defined Graphite as my matrics reporter with

Creating a generic ARRAY_AGG aggregate function for Flink SQL

2021-08-23 Thread Yuval Itzchakov
Hi, I'm trying to implement a generic ARRAY_AGG UDF function (identical to the one that exists in many data WHs, e.g https://docs.snowflake.com/en/sql-reference/functions/array_agg.html) to utilize in Flink SQL. Taking reference from CollectAggFunction

Re: Using RMQ connector in pyflink

2021-08-23 Thread Nadia Mostafa
but RMQ is not listed in the supported Taple API connectors, should I define a custom source? On Mon, Aug 23, 2021 at 4:56 AM Caizhi Weng wrote: > Hi! > > You can first use the Table & SQL API to create a RMQ source table[1]. > > Then you can use the to_data_stream method in TableEnvironment to

Flink's Kafka Offset Management

2021-08-23 Thread Pranjul Ahuja
I use FlinkKafkaConsumer to consume Kafka and enable checkpoints. Now I'm a little confused about the offset management and checkpoint mechanism. What is the behavior if I stop the application by executing the yarn application -kill appId and run the start command like ./bin flink run ...? Will

Re: Looking for suggestions about multithreaded CEP to be used with flink

2021-08-23 Thread Schwalbe Matthias
Hi Tejas, I had your question sit in my mind for a while before I realized I had something to say about it 😊 Although not related to CEP, we had had a very similar problem with too many threads/tasks in an overwhelming split-join-pattern of about 1600 concurrent paths. A colleague of