Re: Flink Job claster scalability

2020-01-08 Thread Zhu Zhu
Hi KristoffSC, Each task needs a slot to run. However, Flink enables slot sharing[1] by default so that one slot can host one parallel instance of each task in a job. That's why your job can start with 6 slots. However, different parallel instances of the same task cannot share a slot. That's why

Re: Elasticsink sometimes gives NoClassDefFoundError

2020-01-08 Thread Jayant Ameta
Hi, The elastic connector is packaged in the uber jar that is submitted. There is only 1 version of the connector: flink-connector-elasticsearch5_2.11:1.7.1 I'm using Flink 1.7.1 I couldn't figure out whether this error causes the job to fail, or whether I see this error when the job is restarting

Re: managedMemoryInMB failure

2020-01-08 Thread Xintong Song
Hi Fanbin, > On YARN setups, this value is automatically configured to the size of the > TaskManager's YARN container, minus a certain tolerance value. > If I understand correctly, you are running Flink standalone cluster both in docker and on EMR? If that is the case, then this sentence has noth

Re:Re: Re: Flink SQL Count Distinct performance optimization

2020-01-08 Thread sunfulin
hi, Thanks for the reply. I am using default FsStateBackend rather than rocksdb with checkpoint off. So I really cannot see any state info from the dashboard. I will research more details and see if any alternative can be optimized. At 2020-01-08 19:07:08, "Benchao Li" wrote: >hi sun

Re: Flink logging issue with logback

2020-01-08 Thread Bajaj, Abhinav
Thanks Dawid, Max and Yang for confirming the issue and providing potential workaround. On 1/8/20, 3:24 AM, "Maximilian Michels" wrote: Interesting that we came across this problem at the same time. We have observed this with Lyft's K8s operator which uses the Rest API for job su

How can I find out which key group belongs to which subtask

2020-01-08 Thread 杨东晓
Hi , I'm trying to do some optimize about Flink 'keyby' processfunction. Is there any possible I can find out one key belongs to which key-group and essentially find out one key-group belongs to which subtask. The motivation I want to know that is we want to force the data records from upstream st

Re: kafka: how to stop consumption temporarily

2020-01-08 Thread David Morin
Awesome ! I gonna implement it. Thanks a lot Arvid. Le mer. 8 janv. 2020 à 12:00, Arvid Heise a écrit : > I'd second Chesnay's suggestion to use a custom source. It would be a > piece of cake with FLIP-27 [1], but we are not there yet unfortunately. > It's probably in Flink 1.11 (mid year) if yo

Re: Session Window with dynamic gap

2020-01-08 Thread KristoffSC
Hi Aljoscha, Thanks for the response. This sounds ok for me. It's as if the message carries additional information that can "tell" operators how to handle this message. Maybe we could use this approach also for different use cases. I will try this approach, thanks. -- Sent from: http://apache

Re: managedMemoryInMB failure

2020-01-08 Thread Fanbin Bu
Xintong, Thanks for looking into this. I changed docker setting of #CPUs to a lower number and it works now. I was using the same code and same flink version. The reason that it works on EMR is that I'm using a machine with large memory. According to the doc: *JVM heap size for the TaskManagers, w

RE: Table API: Joining on Tables of Complex Types

2020-01-08 Thread Hailu, Andreas
Very well - I'll give this a try. Thanks, Dawid. // ah From: Dawid Wysakowicz Sent: Wednesday, January 8, 2020 7:21 AM To: Hailu, Andreas [Engineering] ; user@flink.apache.org Cc: Richards, Adam S [Engineering] Subject: Re: Table API: Joining on Tables of Complex Types Hi Andreas, Convertin

Flink Job claster scalability

2020-01-08 Thread KristoffSC
Hi all, I must say I'm very impressed by Flink and what it can do. I was trying to play around with Flink operator parallelism and scalability and I have few questions regarding this subject. My setup is: 1. Flink 1.9.1 2. Docker Job Cluster, where each Task manager has only one task slot. I'm f

Re: How to verify if checkpoints are asynchronous or sync

2020-01-08 Thread RKandoji
I evaluated performance by looking at the number of input records processed over 10min and 30mins duration. Thanks, R On Wed, Jan 8, 2020 at 2:21 AM Congxian Qiu wrote: > If you want to figure out the performance problem, maybe async-profile[1] > can be helpful > [1] https://github.com/jvm-prof

Re: Using async io in cep

2020-01-08 Thread Dawid Wysakowicz
If I am not mistaken, my previous answer is still valid. There is no way to have true asynchronicity within CEP conditions. Why do you want to use async io there? Did you hit performance issues? If so, you could try increasing the parallelism. Best, Dawid On 07/01/2020 02:47, 郑 洁锋 wrote: > H

Re: Submit high version compiled code jar to low version flink cluster?

2020-01-08 Thread Arvid Heise
If you explicitly need features that are only present in Flink 1.9, chances are high that your code will fail on older versions. If it's just about syntactic sugar, a valid option is to copy the new functions in your code and use that with the old version. However, if you refer to SQL queries then

Re: Flink Dataset to ParquetOutputFormat

2020-01-08 Thread Arvid Heise
Hi Anji, StreamingFileSink has a BucketAssigner that you can use for that purpose. >From the javadoc: The sink uses a BucketAssigner to determine in which bucket directory each element should be written to inside the base directory. The BucketAssigner can, for example, use time or a property of t

Re: Get consumer group offset

2020-01-08 Thread Arvid Heise
Hi Alex, seems like your message got lost during christmas. I don't completely understand the question. Do you mean that Flink does not pick up the consumer group anymore? Btw out of curiosity, why are you still running Kafka 0.10? We are thinking about dropping support for older Kafka versions

Re: Elasticsink sometimes gives NoClassDefFoundError

2020-01-08 Thread Arvid Heise
Hi Jayant, if you only see it sometimes that indicates that you have it in two different versions of the connectors where class loader order is non-deterministic. Could you post the classpath? Btw, it's always good to add which Flink version you use. Best, Arvid On Wed, Jan 8, 2020 at 12:20 PM

Re: Elasticsink sometimes gives NoClassDefFoundError

2020-01-08 Thread Chesnay Schepler
Could you clarify under what circumstances you see this issue? You say "sometimes"; is the job running normally and then failing due to this error? Does it happen when submitting/canceling/restarting a job etc. On 08/01/2020 12:20, Jayant Ameta wrote: Hi, I see the following error sometimes on

Re: Flink group with time-windowed join

2020-01-08 Thread jeremyji
Hi Dawid, I simplified my sql, the original sql is more complex and have a unnest select like: *SELECT a.account, (SUM(a.value) + SUM(b.value)) as result, TUMBLE_START(a.producer_timestamp, INTERVAL '3' MINUTE) FROM (SELECT account,

Re: Table API: Joining on Tables of Complex Types

2020-01-08 Thread Dawid Wysakowicz
Hi Andreas, Converting your GenericRecords to Rows would definitely be the safest option. You can check how its done in the org.apache.flink.formats.avro.AvroRowDeserializationSchema. You can reuse the logic from there to write something like:     DataSet dataset = ...     dataset.map( /* conver

Re: Flink logging issue with logback

2020-01-08 Thread Maximilian Michels
Interesting that we came across this problem at the same time. We have observed this with Lyft's K8s operator which uses the Rest API for job submission, much like the Flink dashboard. Note that you can restore the original stdout/stderr in your program: private static void restoreStdOutAndS

Elasticsink sometimes gives NoClassDefFoundError

2020-01-08 Thread Jayant Ameta
Hi, I see the following error sometimes on my flink job, even though the class is present in my uber jar. java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/elasticsearch5/shaded/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink$1 at org.apache.flink.streaming.conne

Re: Re: Flink SQL Count Distinct performance optimization

2020-01-08 Thread Benchao Li
hi sunfulin, As Kurt pointed out, if you use RocksDB state backend, maybe slow disk IO bound your job. You can check WindowOperator's latency metric to see how long it tasks to process an element. Hope this helps. sunfulin 于2020年1月8日周三 下午4:04写道: > Ah, I had checked resource usage and GC from fl

Re: kafka: how to stop consumption temporarily

2020-01-08 Thread Arvid Heise
I'd second Chesnay's suggestion to use a custom source. It would be a piece of cake with FLIP-27 [1], but we are not there yet unfortunately. It's probably in Flink 1.11 (mid year) if you can wait. The current way would be a source that wraps the two KafkaConsumer and blocks the normal consumer fr

Re: Migrate custom partitioner from Flink 1.7 to Flink 1.9

2020-01-08 Thread Arvid Heise
Hi Salva, I already answered on SO [1], but I'll replicate it here: With Flink 1.9, you cannot dynamically broadcast to all channels anymore. Your StreamPartitioner has to statically specify if it's a broadcast with isBroadcast. Then, selectChannel is never invoked. Do you have a specific use ca

Re: Session Window with dynamic gap

2020-01-08 Thread Aljoscha Krettek
Hi Kristoff, There are no plans of adding state support to the gap extractors but you could do this using a two-step approach, i.e. have an operation in front of the window that keeps track of session gaps, enriches the message with the gap that should be used and then the extractor extracts th

Re: Late outputs for Session Window

2020-01-08 Thread KristoffSC
Hi, thank you for your SO comment [1]. You are right. Sorry, I miss understand the "late message" concepts. In fact I was never sending "late events" that should match just ended window. Thank you for your comments and clarification. [1] https://stackoverflow.com/questions/59570445/late-outpu

Re: Late outputs for Session Window

2020-01-08 Thread Arvid Heise
Hi Kristoff, please check my SO comment and reply. https://stackoverflow.com/questions/59570445/late-outputs-missing-for-flinks-session-window/59642942#59642942 It's not entirely clear to me why it's not working but I also don't quite understand your use case yet (data examples missing). Best,

Re: Using redis cache in flink

2020-01-08 Thread Yun Tang
Hi Navneeth If you need the redis cache to be fault tolerant, I am afraid you have to choose redis cluster since Flink might deploy task on another node which is different from previous node after job failover. If you don't care about the fault tolerance, you could implement a customized opera

Re: Using redis cache in flink

2020-01-08 Thread Yun Tang
Hi Navneeth If you need the redis cache to be fault tolerant, I am afraid you have to choose redis cluster since Flink might deploy task on another node which is different from previous node after job failover. If you don't care about the fault tolerance, you could implement a customized opera

Re:Re: Flink SQL Count Distinct performance optimization

2020-01-08 Thread sunfulin
Ah, I had checked resource usage and GC from flink dashboard. Seem that the reason is not cpu or memory issue. Task heap memory usage is less then 30%. Could you kindly tell that how I can see more metrics to help target the bottleneck? Really appreciated that. At 2020-01-08 15:59:17, "

Re:Re: Flink SQL Count Distinct performance optimization

2020-01-08 Thread sunfulin
hi,godfreyhe As far as I can see, I rewrite the running sql from one count distinct level to 2 level agg, just as the table.optimizer.distinct-agg.split.enabled param worked. Correct me if I am telling the wrong way. But the rewrite sql does not work well for the performance throughout. For n