Re: Flink Job claster scalability

2020-01-08 Thread Zhu Zhu
Hi KristoffSC,

Each task needs a slot to run. However, Flink enables slot sharing[1] by
default so that one slot can host one parallel instance of each task in a
job. That's why your job can start with 6 slots.
However, different parallel instances of the same task cannot share a slot.
That's why you need at least 6 slots to run your job.

You can set tasks to be in different slot sharing group via
'.slotSharingGroup(xxx)' to force certain tasks to not share slots. This
allows the tasks to not burden each other. However, in this way the job
will need more slots to start.

So for your questions:
#1 yes
#2 ATM, you will need to resubmit your job with the adjusted parallelism.
The rescale cli was experimental and was temporarily removed [2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#task-slots-and-resources
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-Temporarily-remove-support-for-job-rescaling-via-CLI-action-quot-modify-quot-td27447.html

Thanks,
Zhu Zhu

KristoffSC  于2020年1月9日周四 上午1:05写道:

> Hi all,
> I must say I'm very impressed by Flink and what it can do.
>
> I was trying to play around with Flink operator parallelism and scalability
> and I have few questions regarding this subject.
>
> My setup is:
> 1. Flink 1.9.1
> 2. Docker Job Cluster, where each Task manager has only one task slot. I'm
> following [1]
> 3. env setup:
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000));
> env.setParallelism(1);
> env.setMaxParallelism(128);
> env.enableCheckpointing(10 * 60 * 1000);
>
> Please mind that I am using operator chaining here.
>
> My pipeline setup:
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture2.png>
>
>
>
> As you can see I have 7 operators (few of them were actually chained and
> this is ok), with different parallelism level. This all gives me 23 tasks
> total.
>
>
> I've noticed that with "one task manager = one task slot" approach I have
> to
> have 6 task slots/task managers to be able to start this pipeline.
>
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture1.png>
>
>
> If number of task slots is lower than 6, job is scheduled but not started.
>
> With 6 task slots everything is working fine and I've must say that I'm
> very
> impressed with a way that Flinks balanced data between task slots. Data was
> distributed very evenly between operator instances/tasks.
>
> In this setup (7 operators, 23 tasks and 6 task slots), some task slots
> have
> to be reused by more than one operator. While inspecting UI I've found
> examples such operators. This is what I was expecting though.
>
> However I was surprised a little bit after I added one additional task
> manager (hence one new task slot)
>
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture3.png>
>
>
> After adding new resources, Flink did not re balanced/redistributed the
> graph. So this host was sitting there and doing nothing. Even after putting
> some load on the cluster, still this node was not used.
>
>
> *After doing this exercise I have few questions:*
>
> 1. It seems that number of task slots must be equal or greater than max
> number of parallelism used in the pipeline. In my case it was 6. When I
> changed parallelism for one of the operator to 7, I had to have 7 task
> slots
> (task managers in my setup) to be able to even start the job.
> Is this the case?
>
> 2. What I can do to use the extra node that was spanned while job was
> running?
> In other words, If I would see that one of my nodes has to much load what I
> can do? Please mind that I'm using keyBy/hashing function in my pipeline
> and
> in my tests I had around 5000 unique keys.
>
> I've try to use REST API to call "rescale" but I got this response:
> /302{"errors":["Rescaling is temporarily disabled. See FLINK-12312."]}/
>
> Thanks.
>
> [1]
>
> https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Elasticsink sometimes gives NoClassDefFoundError

2020-01-08 Thread Jayant Ameta
Hi,
The elastic connector is packaged in the uber jar that is submitted. There
is only 1 version of the connector:
flink-connector-elasticsearch5_2.11:1.7.1
I'm using Flink 1.7.1

I couldn't figure out whether this error causes the job to fail, or whether
I see this error when the job is restarting after some other failure.
But, the occurrence of this error and job restarts is correlated.


Jayant Ameta


On Wed, Jan 8, 2020 at 6:47 PM Arvid Heise  wrote:

> Hi Jayant,
>
> if you only see it sometimes that indicates that you have it in two
> different versions of the connectors where class loader order is
> non-deterministic. Could you post the classpath?
>
> Btw, it's always good to add which Flink version you use.
>
> Best,
>
> Arvid
>
> On Wed, Jan 8, 2020 at 12:20 PM Jayant Ameta  wrote:
>
>> Hi,
>> I see the following error sometimes on my flink job, even though the
>> class is present in my uber jar.
>>
>> java.lang.NoClassDefFoundError:
>> org/apache/flink/streaming/connectors/elasticsearch5/shaded/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink$1
>> at
>> org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.connect(NioClientSocketPipelineSink.java:111)
>> ... 17 common frames omitted Wrapped by:
>> org.elasticsearch.ElasticsearchException: java.lang.NoClassDefFoundError:
>> org/apache/flink/streaming/connectors/elasticsearch5/shaded/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink$1
>> at
>> org.elasticsearch.transport.netty3.Netty3Transport.exceptionCaught(Netty3Transport.java:325)
>> at
>> org.elasticsearch.transport.netty3.Netty3MessageChannelHandler.exceptionCaught(Netty3MessageChannelHandler.java:83)
>> at
>> org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112)
>> at
>> org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>> at
>> org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
>> ... 23 frames truncated
>>
>>
>> Jayant
>>
>


Re: managedMemoryInMB failure

2020-01-08 Thread Xintong Song
Hi Fanbin,


> On YARN setups, this value is automatically configured to the size of the
> TaskManager's YARN container, minus a certain tolerance value.
>
If I understand correctly, you are running Flink standalone cluster both in
docker and on EMR? If that is the case, then this sentence has nothing to
do with your case, because it's describing about Yarn deployment.

It should also be irrelevant that you are using a machine with larger
memory on EMR, as long as the "taskmanager.heap.size" are the same. In your
case, I assume for both scenarios the default 1024m is used?

If "taskmanager.memory.size" is not explicitly specified, Flink will
automatically decide the managed memory size. The derived size of managed
memory is depended on the JVM free memory after launching the TM. Flink
will trigger a "System.gc()" after the TM is started, and read the JVM free
heap size after it. I guess the reason decreasing docker cpu cores works
might be that, less cpu cores somehow results in less heap memory
consumption, leaving more free heap memory, thus more managed memory.
AFAIK, there are several places in TM where Flink read the system cpu cores
and decide thread pool sizes accordingly. But this is just my guess and I
cannot confirm it.

I would suggest you to configure "taskmanager.memory.size" explicitly
anyway, to avoid potential problems caused by the uncertainty of JVM free
heap memory size. BTW, this randomness is eliminated in Flink 1.10.

Thank you~

Xintong Song



On Thu, Jan 9, 2020 at 3:04 AM Fanbin Bu  wrote:

> Xintong,
>
> Thanks for looking into this. I changed docker setting of #CPUs to a lower
> number and it works now.
> I was using the same code and same flink version. The reason that it works
> on EMR is that I'm using a machine with large memory.
> According to the doc:
> *JVM heap size for the TaskManagers, which are the parallel workers of the
> system. On YARN setups, this value is automatically configured to the size
> of the TaskManager's YARN container, minus a certain tolerance value.*
>
> The default value for JVM heap size is 1024m and I was configuring docker
> to have 6 CPUs and that failed blink batch jobs.
>
> Thanks for your help!
> Fanbin
>
> On Tue, Jan 7, 2020 at 7:51 PM Xintong Song  wrote:
>
>> Hi Fanbin,
>>
>> The blink planner batch sql operators requires managed memory, and the
>> amount of managed memory needed depends on your job. The failure is because
>> the slot, according to your cluster configurations, does not have enough
>> managed memory to fulfill the requests.
>>
>> To fix the problem, you would need to configure more managed memory for
>> your task executors. You can set the config option
>> "taskmanager.memory.size" to the value of 'managedMemoryPerSlot (138m in
>> your case) * numberOfSlots'.
>>
>> It's not clear to me why the exactly same code works on emr. Were you
>> running the same version of flink?
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Wed, Jan 8, 2020 at 8:18 AM Fanbin Bu  wrote:
>>
>>> Hi,
>>>
>>> with Flink 1.9 running in docker mode, I have a batch job and got the
>>> following error message.
>>>
>>> However, it works totally fine with the same code on EMR. I checked the
>>> log and here is the only difference:
>>> managedMemoryInMB=138 . (the working ones has 0 value)
>>>
>>> did anybody see this before?
>>> Thanks,
>>> Fanbin
>>>
>>>
>>> org.apache.flink.runtime.jobmanager.scheduler.
>>> NoResourceAvailableException: No pooled slot available and request to
>>> ResourceManager for new slot failed
>>> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>>> .slotRequestToResourceManagerFailed(SlotPoolImpl.java:357)
>>> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>>> .lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:345)
>>> at java.util.concurrent.CompletableFuture.uniWhenComplete(
>>> CompletableFuture.java:774)
>>> at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(
>>> CompletableFuture.java:792)
>>> at java.util.concurrent.CompletableFuture.whenComplete(
>>> CompletableFuture.java:2153)
>>> at org.apache.flink.runtime.concurrent.FutureUtils
>>> .whenCompleteAsyncIfNotDone(FutureUtils.java:940)
>>> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>>> .requestSlotFromResourceManager(SlotPoolImpl.java:339)
>>> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>>> .requestNewAllocatedSlotInternal(SlotPoolImpl.java:306)
>>> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>>> .requestNewAllocatedBatchSlot(SlotPoolImpl.java:448)
>>> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>>> .requestNewAllocatedSlot(SchedulerImpl.java:262)
>>> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>>> .allocateMultiTaskSlot(SchedulerImpl.java:542)
>>> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>>> .allocateSharedSlot(SchedulerImpl.java:341)
>>> at 

Re:Re: Re: Flink SQL Count Distinct performance optimization

2020-01-08 Thread sunfulin
hi,
Thanks for the reply. I am using default FsStateBackend rather than rocksdb 
with checkpoint off. So I really cannot see any state info from the dashboard. 
I will research more details and see if any alternative can be optimized. 











At 2020-01-08 19:07:08, "Benchao Li"  wrote:
>hi sunfulin,
>
>As Kurt pointed out, if you use RocksDB state backend, maybe slow disk IO
>bound your job.
>You can check WindowOperator's latency metric to see how long it tasks to
>process an element.
>Hope this helps.
>
>sunfulin  于2020年1月8日周三 下午4:04写道:
>
>> Ah, I had checked resource usage and GC from flink dashboard. Seem that
>> the reason is not cpu or memory issue. Task heap memory usage is less then
>> 30%. Could you kindly tell that how I can see more metrics to help target
>> the bottleneck?
>> Really appreciated that.
>>
>>
>>
>>
>>
>> At 2020-01-08 15:59:17, "Kurt Young"  wrote:
>>
>> Hi,
>>
>> Could you try to find out what's the bottleneck of your current job? This
>> would leads to
>> different optimizations. Such as whether it's CPU bounded, or you have too
>> big local
>> state thus stuck by too many slow IOs.
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Jan 8, 2020 at 3:53 PM 贺小令  wrote:
>>
>>> hi sunfulin,
>>> you can try with blink planner (since 1.9 +), which optimizes distinct
>>> aggregation. you can also try to enable
>>> *table.optimizer.distinct-agg.split.enabled* if the data is skew.
>>>
>>> best,
>>> godfreyhe
>>>
>>> sunfulin  于2020年1月8日周三 下午3:39写道:
>>>
 Hi, community,
 I'm using Apache Flink SQL to build some of my realtime streaming apps.
 With one scenario I'm trying to count(distinct deviceID) over about 100GB
 data set in realtime, and aggregate results with sink to ElasticSearch
 index. I met a severe performance issue when running my flink job. Wanner
 get some help from community.


 Flink version : 1.8.2
 Running on yarn with 4 yarn slots per task manager. My flink task
 parallelism is set to be 10, which is equal to my kafka source partitions.
 After running the job, I can observe high backpressure from the flink
 dashboard. Any suggestions and kind of help is highly appreciated.


 running sql is like the following:


 INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)

 select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
 clkCnt  from

 (

 SELECT

  aggId,

  pageId,

  statkey,

  COUNT(DISTINCT deviceId) as cnt

  FROM

  (

  SELECT

  'ZL_005' as aggId,

  'ZL_UV_PER_MINUTE' as pageId,

  deviceId,

  ts2Date(recvTime) as statkey

  from

  kafka_zl_etrack_event_stream

  )

  GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)

 ) as t1

 group by aggId, pageId, statkey
















 Best
>>>
>>>
>>
>>
>>
>
>
>-- 
>
>Benchao Li
>School of Electronics Engineering and Computer Science, Peking University
>Tel:+86-15650713730
>Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re:Re: Re: Flink SQL Count Distinct performance optimization

2020-01-08 Thread sunfulin
hi,
Thanks for the reply. I am using default FsStateBackend rather than rocksdb 
with checkpoint off. So I really cannot see any state info from the dashboard. 
I will research more details and see if any alternative can be optimized. 











At 2020-01-08 19:07:08, "Benchao Li"  wrote:
>hi sunfulin,
>
>As Kurt pointed out, if you use RocksDB state backend, maybe slow disk IO
>bound your job.
>You can check WindowOperator's latency metric to see how long it tasks to
>process an element.
>Hope this helps.
>
>sunfulin  于2020年1月8日周三 下午4:04写道:
>
>> Ah, I had checked resource usage and GC from flink dashboard. Seem that
>> the reason is not cpu or memory issue. Task heap memory usage is less then
>> 30%. Could you kindly tell that how I can see more metrics to help target
>> the bottleneck?
>> Really appreciated that.
>>
>>
>>
>>
>>
>> At 2020-01-08 15:59:17, "Kurt Young"  wrote:
>>
>> Hi,
>>
>> Could you try to find out what's the bottleneck of your current job? This
>> would leads to
>> different optimizations. Such as whether it's CPU bounded, or you have too
>> big local
>> state thus stuck by too many slow IOs.
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Jan 8, 2020 at 3:53 PM 贺小令  wrote:
>>
>>> hi sunfulin,
>>> you can try with blink planner (since 1.9 +), which optimizes distinct
>>> aggregation. you can also try to enable
>>> *table.optimizer.distinct-agg.split.enabled* if the data is skew.
>>>
>>> best,
>>> godfreyhe
>>>
>>> sunfulin  于2020年1月8日周三 下午3:39写道:
>>>
 Hi, community,
 I'm using Apache Flink SQL to build some of my realtime streaming apps.
 With one scenario I'm trying to count(distinct deviceID) over about 100GB
 data set in realtime, and aggregate results with sink to ElasticSearch
 index. I met a severe performance issue when running my flink job. Wanner
 get some help from community.


 Flink version : 1.8.2
 Running on yarn with 4 yarn slots per task manager. My flink task
 parallelism is set to be 10, which is equal to my kafka source partitions.
 After running the job, I can observe high backpressure from the flink
 dashboard. Any suggestions and kind of help is highly appreciated.


 running sql is like the following:


 INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)

 select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
 clkCnt  from

 (

 SELECT

  aggId,

  pageId,

  statkey,

  COUNT(DISTINCT deviceId) as cnt

  FROM

  (

  SELECT

  'ZL_005' as aggId,

  'ZL_UV_PER_MINUTE' as pageId,

  deviceId,

  ts2Date(recvTime) as statkey

  from

  kafka_zl_etrack_event_stream

  )

  GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)

 ) as t1

 group by aggId, pageId, statkey
















 Best
>>>
>>>
>>
>>
>>
>
>
>-- 
>
>Benchao Li
>School of Electronics Engineering and Computer Science, Peking University
>Tel:+86-15650713730
>Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Flink logging issue with logback

2020-01-08 Thread Bajaj, Abhinav
Thanks Dawid, Max and Yang for confirming the issue and providing potential 
workaround.

On 1/8/20, 3:24 AM, "Maximilian Michels"  wrote:

Interesting that we came across this problem at the same time. We have 
observed this with Lyft's K8s operator which uses the Rest API for job 
submission, much like the Flink dashboard.

Note that you can restore the original stdout/stderr in your program:

   private static void restoreStdOutAndStdErr() {
 System.setOut(new PrintStream(
 new FileOutputStream(FileDescriptor.out)));
 System.setErr(new PrintStream(
 new FileOutputStream(FileDescriptor.err)));
   }

Just call restoreStdOutAndStdErr() before you start building the Flink 
job. Of course, this is just meant to be a workaround.

I think an acceptable solution is to always print upon execution. For 
the plan preview we may keep the existing behavior.

Cheers,
Max

On 07.01.20 17:39, Dawid Wysakowicz wrote:
> A quick update. The suppression of stdout/stderr actually might soon be 
> dropped, see: 
https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-15504data=01%7C01%7C%7Ce78cef8e4589a72d08d7942d53eb%7C6d4034cd72254f72b85391feaea64919%7C1sdata=a2dpid%2Fm1SN8%2F5sx09%2FbuVuk%2FI7UM%2BWMZtNALaYf8rU%3Dreserved=0
> 
> Best,
> 
> Dawid
> 
> On 07/01/2020 07:17, Yang Wang wrote:
>> Hi Bajaj,
>>
>> I have tested just as you say, and find that the logs in the user 
>> class could not show up when
>> using ConsoleAppender. If using FileAppender instead, everything goes 
>> well.
>>
>> It is so weird and i have no idea how to debug it.
>> Best,
>> Yang
>>
>> Bajaj, Abhinav > > 于2020年1月7日周二 上午4:28写道:
>>
>> Hi,
>>
>> Thanks much for the responses.
>>
>> Let me add some more details and clarify my question.
>>
>> _Setup_
>>
>>   * I used the WikipediaAnalysis example and added a log in main
>> method.
>>
>> ……
>>
>> public static void main(String[] args) throws Exception {
>>   StreamExecutionEnvironment see =
>> StreamExecutionEnvironment./getExecutionEnvironment/();
>> /LOG/.info("Info log for test");
>>
>> DataStream edits = see.addSource(new
>> WikipediaEditsSource());
>>
>> ……
>>
>>   * I am using the Flink 1.7.1 distribution and starting
>> jobmanager and taskmanager locally using the below commands –
>>   o ./bin/jobmanager.sh start-foreground
>>   o ./bin/taskmanager.sh start-foreground
>>   o Both jobmanager and taskmanager log in the console now
>>   o JVM options are correctly set and verified from jobmanager
>> & taskmanager logs
>>
>>   * I submit the WikipediaAnalysis job from Flink dashboard and
>> checked the jobmanager logs
>>
>> _Run 1_: Flink is using the default log4j logging
>>
>>   * Jobmanager logs the added info log from the job
>>   o 2020-01-06 11:55:37,422 INFO wikiedits.WikipediaAnalysis -
>> Info log for test
>>
>> _Run 2_: Flink is setup to use logback as suggested in Flink
>> documentation here
>> 

>>
>>   * Jobmanger does not log the added info log from the job
>>
>> So, it seems there is a logging behavior difference between using
>> log4j & logback in Flink.
>>
>> Is this expected or a known difference?
>>
>> Thanks again,
>>
>> Abhinav Bajaj
>>
>> _PS_: Ahh. I see how my email was confusing the first time.
>> Hopefully this one is better :P
>>
>> *From: *Dawid Wysakowicz > >
>> *Date: *Monday, January 6, 2020 at 5:13 AM
>> *Cc: *"Bajaj, Abhinav" > >, "user@flink.apache.org
>> " > >
>> *Subject: *Re: Flink logging issue with logback
>>
>> Hi Bajaj,
>>
>> I am not entirely sure what is the actual issue you are seeking
>> help, but let me comment on your observations.
>>
>> Ad. 1
>>
>> If you log to the console from the main method this is an expected
>> behavior in both cases (log4j, logback). The std out is 

How can I find out which key group belongs to which subtask

2020-01-08 Thread 杨东晓
Hi , I'm trying to do some optimize about Flink 'keyby' processfunction. Is
there any possible I can find out one key belongs to which key-group
and essentially find out one key-group belongs to which subtask.
The motivation I want to know that is we want to  force the data records
from upstream still goes to same taskmanager downstream subtask .Which
means even if we use a keyedstream function we still want no cross jvm
communication happened during run time.
And if we can achieve that , can we also avoid the expensive cost for
record serialization because data is only transferred in same taskmanager
jvm instance?

Thanks.


Re: kafka: how to stop consumption temporarily

2020-01-08 Thread David Morin
Awesome !
I gonna implement it.
Thanks a lot Arvid.

Le mer. 8 janv. 2020 à 12:00, Arvid Heise  a écrit :

> I'd second Chesnay's suggestion to use a custom source. It would be a
> piece of cake with FLIP-27 [1], but we are not there yet unfortunately.
> It's probably in Flink 1.11 (mid year) if you can wait.
>
> The current way would be a source that wraps the two KafkaConsumer and
> blocks the normal consumer from outputting elements. Here is a quick and
> dirty solution that I threw together:
> https://gist.github.com/AHeise/d7a8662f091e5a135c5ccfd6630634dd .
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>
> On Mon, Jan 6, 2020 at 1:16 PM David Morin 
> wrote:
>
>> My naive solution can't work because a dump can be quite long.
>> So, yes I have to find a way to stop the consumption from the topic used
>> for streaming mode when a dump is done :(
>> Terry, I try to implement something based on your reply and based on this
>> thread
>> https://stackoverflow.com/questions/59201503/flink-kafka-gracefully-close-flink-consuming-messages-from-kafka-source-after-a
>> Any suggestions are welcomed
>> thx.
>>
>> David
>>
>> On 2020/01/06 09:35:37, David Morin  wrote:
>> > Hi,
>> >
>> > Thanks for your replies.
>> > Yes Terry. You are right. I can try to create a custom source.
>> > But perhaps, according to my use case, I figured out I can use a
>> technical field in my data. This is a timestamp and I think I just have to
>> ignore late events with watermarks or later in the pipeline according to
>> metadata stored in the Flink state. I test it now...
>> > Thx
>> >
>> > David
>> >
>> > On 2020/01/03 15:44:08, Chesnay Schepler  wrote:
>> > > Are you asking how to detect from within the job whether the dump is
>> > > complete, or how to combine these 2 jobs?
>> > >
>> > > If you had a way to notice whether the dump is complete, then I would
>> > > suggest to create a custom source that wraps 2 kafka sources, and
>> switch
>> > > between them at will based on your conditions.
>> > >
>> > >
>> > > On 03/01/2020 03:53, Terry Wang wrote:
>> > > > Hi,
>> > > >
>> > > > I’d like to share my opinion here. It seems that you need adjust
>> the Kafka consumer to have communication each other. When your begin the
>> dump process, you need to notify another CDC-topic consumer to wait idle.
>> > > >
>> > > >
>> > > > Best,
>> > > > Terry Wang
>> > > >
>> > > >
>> > > >
>> > > >> 2020年1月2日 16:49,David Morin  写道:
>> > > >>
>> > > >> Hi,
>> > > >>
>> > > >> Is there a way to stop temporarily to consume one kafka source in
>> streaming mode ?
>> > > >> Use case: I have to consume 2 topics but in fact one of them is
>> more prioritized.
>> > > >> One of this topic is dedicated to ingest data from db (change data
>> capture) and one of them is dedicated to make a synchronization (a dump
>> i.e. a SELECT ... from db). At the moment the last one is performed by one
>> Flink job and we start this one after stop the previous one (CDC) manually
>> > > >> I want to merge these 2 modes and automatically stop consumption
>> of the topic dedicated to the CDC mode when a dump is done.
>> > > >> How to handle that with Flink in a streaming way ? backpressure ?
>> ...
>> > > >> Thx in advance for your insights
>> > > >>
>> > > >> David
>> > > >
>> > >
>> > >
>> >
>>
>


Re: Session Window with dynamic gap

2020-01-08 Thread KristoffSC
Hi Aljoscha,
Thanks for the response.

This sounds ok for me. It's as if the message carries additional information
that can "tell" operators how to handle this message. Maybe we could use
this approach also for different use cases.

I will try this approach, thanks.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: managedMemoryInMB failure

2020-01-08 Thread Fanbin Bu
Xintong,

Thanks for looking into this. I changed docker setting of #CPUs to a lower
number and it works now.
I was using the same code and same flink version. The reason that it works
on EMR is that I'm using a machine with large memory.
According to the doc:
*JVM heap size for the TaskManagers, which are the parallel workers of the
system. On YARN setups, this value is automatically configured to the size
of the TaskManager's YARN container, minus a certain tolerance value.*

The default value for JVM heap size is 1024m and I was configuring docker
to have 6 CPUs and that failed blink batch jobs.

Thanks for your help!
Fanbin

On Tue, Jan 7, 2020 at 7:51 PM Xintong Song  wrote:

> Hi Fanbin,
>
> The blink planner batch sql operators requires managed memory, and the
> amount of managed memory needed depends on your job. The failure is because
> the slot, according to your cluster configurations, does not have enough
> managed memory to fulfill the requests.
>
> To fix the problem, you would need to configure more managed memory for
> your task executors. You can set the config option
> "taskmanager.memory.size" to the value of 'managedMemoryPerSlot (138m in
> your case) * numberOfSlots'.
>
> It's not clear to me why the exactly same code works on emr. Were you
> running the same version of flink?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Jan 8, 2020 at 8:18 AM Fanbin Bu  wrote:
>
>> Hi,
>>
>> with Flink 1.9 running in docker mode, I have a batch job and got the
>> following error message.
>>
>> However, it works totally fine with the same code on EMR. I checked the
>> log and here is the only difference:
>> managedMemoryInMB=138 . (the working ones has 0 value)
>>
>> did anybody see this before?
>> Thanks,
>> Fanbin
>>
>>
>> org.apache.flink.runtime.jobmanager.scheduler.
>> NoResourceAvailableException: No pooled slot available and request to
>> ResourceManager for new slot failed
>> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>> .slotRequestToResourceManagerFailed(SlotPoolImpl.java:357)
>> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>> .lambda$requestSlotFromResourceManager$1(SlotPoolImpl.java:345)
>> at java.util.concurrent.CompletableFuture.uniWhenComplete(
>> CompletableFuture.java:774)
>> at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(
>> CompletableFuture.java:792)
>> at java.util.concurrent.CompletableFuture.whenComplete(
>> CompletableFuture.java:2153)
>> at org.apache.flink.runtime.concurrent.FutureUtils
>> .whenCompleteAsyncIfNotDone(FutureUtils.java:940)
>> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>> .requestSlotFromResourceManager(SlotPoolImpl.java:339)
>> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>> .requestNewAllocatedSlotInternal(SlotPoolImpl.java:306)
>> at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
>> .requestNewAllocatedBatchSlot(SlotPoolImpl.java:448)
>> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>> .requestNewAllocatedSlot(SchedulerImpl.java:262)
>> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>> .allocateMultiTaskSlot(SchedulerImpl.java:542)
>> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>> .allocateSharedSlot(SchedulerImpl.java:341)
>> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>> .internalAllocateSlot(SchedulerImpl.java:168)
>> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>> .allocateSlotInternal(SchedulerImpl.java:149)
>> at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl
>> .allocateBatchSlot(SchedulerImpl.java:129)
>> at org.apache.flink.runtime.executiongraph.
>> SlotProviderStrategy$BatchSlotProviderStrategy.allocateSlot(
>> SlotProviderStrategy.java:109)
>> at org.apache.flink.runtime.executiongraph.Execution
>> .lambda$allocateAndAssignSlotForExecution$2(Execution.java:556)
>> at java.util.concurrent.CompletableFuture.uniComposeStage(
>> CompletableFuture.java:995)
>> at java.util.concurrent.CompletableFuture.thenCompose(
>> CompletableFuture.java:2137)
>> at org.apache.flink.runtime.executiongraph.Execution
>> .allocateAndAssignSlotForExecution(Execution.java:554)
>> at org.apache.flink.runtime.executiongraph.Execution
>> .allocateResourcesForExecution(Execution.java:496)
>> at org.apache.flink.runtime.executiongraph.Execution
>> .scheduleForExecution(Execution.java:439)
>> at org.apache.flink.runtime.executiongraph.ExecutionVertex
>> .scheduleForExecution(ExecutionVertex.java:674)
>> at org.apache.flink.runtime.executiongraph.Execution
>> .scheduleConsumer(Execution.java:850)
>> at org.apache.flink.runtime.executiongraph.Execution
>> .scheduleOrUpdateConsumers(Execution.java:887)
>> at org.apache.flink.runtime.executiongraph.Execution.markFinished(
>> Execution.java:1064)
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph
>> 

RE: Table API: Joining on Tables of Complex Types

2020-01-08 Thread Hailu, Andreas
Very well - I'll give this a try. Thanks, Dawid.

// ah

From: Dawid Wysakowicz 
Sent: Wednesday, January 8, 2020 7:21 AM
To: Hailu, Andreas [Engineering] ; 
user@flink.apache.org
Cc: Richards, Adam S [Engineering] 
Subject: Re: Table API: Joining on Tables of Complex Types


Hi Andreas,

Converting your GenericRecords to Rows would definitely be the safest option. 
You can check how its done in the 
org.apache.flink.formats.avro.AvroRowDeserializationSchema. You can reuse the 
logic from there to write something like:

DataSet dataset = ...

dataset.map( /* convert GenericRecord to Row 
*/).returns(AvroSchemaConverter.convertToTypeInfo(avroSchemaString));

Another thing you could try is to make sure that GenericRecord is seen as an 
avro type by fink (flink should understand that avro type is a complex type):

dataset.returns(new GenericRecordAvroTypeInfo(/*schema string*/)

than the TableEnvironment should pick it up as a structured type and flatten it 
automatically when registering the Table. Bear in mind the returns method is 
part of SingleInputUdfOperator so you can apply it right after some 
transformation e.g. map/flatMap etc.

Best,

Dawid


On 06/01/2020 18:03, Hailu, Andreas wrote:
Hi David, thanks for getting back.

>From what you've said, I think we'll need to convert our GenericRecord into 
>structured types - do you have any references or examples I can have a look 
>at? If not, perhaps you could just show me a basic example of flattening a 
>complex object with accessors into a Table of structured types. Or by 
>structured types, did you mean Row?

// ah

From: Dawid Wysakowicz 
Sent: Monday, January 6, 2020 9:32 AM
To: Hailu, Andreas [Engineering] 
; 
user@flink.apache.org
Cc: Richards, Adam S [Engineering] 

Subject: Re: Table API: Joining on Tables of Complex Types


Hi Andreas,

First of all I would highly recommend converting a non-structured types to 
structured types as soon as possible as it opens more possibilities to optimize 
the plan.

Have you tried:

Table users = 
batchTableEnvironment.fromDataSet(usersDataset).select("getField(f0, userName) 
as userName", "f0")
Table other = 
batchTableEnvironment.fromDataSet(otherDataset).select("getField(f0, userName) 
as user", "f1")

Table result = other.join(users, "user = userName")

You could also check how the 
org.apache.flink.formats.avro.AvroRowDeserializationSchema class is implemented 
which internally converts an avro record to a structured Row.

Hope this helps.

Best,

Dawid
On 03/01/2020 23:16, Hailu, Andreas wrote:
Hi folks,

I'm trying to join two Tables which are composed of complex types, Avro's 
GenericRecord to be exact. I have to use a custom UDF to extract fields out of 
the record and I'm having some trouble on how to do joins on them as I need to 
call this UDF to read what I need. Example below:

batchTableEnvironment.registerFunction("getField", new GRFieldExtractor()); // 
GenericRecord field extractor
Table users = batchTableEnvironment.fromDataSet(usersDataset); // Converting 
from some pre-existing DataSet
Table otherDataset = batchTableEnvironment.fromDataSet(someOtherDataset);
Table userNames = t.select("getField(f0, userName)"); // This is how the UDF is 
used, as GenericRecord is a complex type requiring you to invoke a get() method 
on the field you're interested in. Here we get a get on field 'userName'

I'd like to do something using the Table API similar to the query "SELECT * 
from otherDataset WHERE otherDataset.userName = users.userName". How is this 
done?

Best,
Andreas

The Goldman Sachs Group, Inc. All rights reserved.
See http://www.gs.com/disclaimer/global_email for important risk disclosures, 
conflicts of interest and other terms and conditions relating to this e-mail 
and your reliance on information contained in it.  This message may contain 
confidential or privileged information.  If you are not the intended recipient, 
please advise us immediately and delete this message.  See 
http://www.gs.com/disclaimer/email for further information on confidentiality 
and the risks of non-secure electronic communication.  If you cannot access 
these links, please notify us by reply message and we will send the contents to 
you.




Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices



Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal 

Flink Job claster scalability

2020-01-08 Thread KristoffSC
Hi all,
I must say I'm very impressed by Flink and what it can do.

I was trying to play around with Flink operator parallelism and scalability
and I have few questions regarding this subject. 

My setup is:
1. Flink 1.9.1
2. Docker Job Cluster, where each Task manager has only one task slot. I'm
following [1]
3. env setup:
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000));
env.setParallelism(1);
env.setMaxParallelism(128);
env.enableCheckpointing(10 * 60 * 1000);

Please mind that I am using operator chaining here. 

My pipeline setup:

 


As you can see I have 7 operators (few of them were actually chained and
this is ok), with different parallelism level. This all gives me 23 tasks
total. 


I've noticed that with "one task manager = one task slot" approach I have to
have 6 task slots/task managers to be able to start this pipeline.


 

If number of task slots is lower than 6, job is scheduled but not started. 

With 6 task slots everything is working fine and I've must say that I'm very
impressed with a way that Flinks balanced data between task slots. Data was
distributed very evenly between operator instances/tasks. 

In this setup (7 operators, 23 tasks and 6 task slots), some task slots have
to be reused by more than one operator. While inspecting UI I've found
examples such operators. This is what I was expecting though.

However I was surprised a little bit after I added one additional task
manager (hence one new task slot)


 

After adding new resources, Flink did not re balanced/redistributed the
graph. So this host was sitting there and doing nothing. Even after putting
some load on the cluster, still this node was not used.

 
*After doing this exercise I have few questions:*

1. It seems that number of task slots must be equal or greater than max
number of parallelism used in the pipeline. In my case it was 6. When I
changed parallelism for one of the operator to 7, I had to have 7 task slots
(task managers in my setup) to be able to even start the job. 
Is this the case?

2. What I can do to use the extra node that was spanned while job was
running?
In other words, If I would see that one of my nodes has to much load what I
can do? Please mind that I'm using keyBy/hashing function in my pipeline and
in my tests I had around 5000 unique keys.

I've try to use REST API to call "rescale" but I got this response:
/302{"errors":["Rescaling is temporarily disabled. See FLINK-12312."]}/

Thanks.

[1]
https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink算子状态查看

2020-01-08 Thread Yun Tang
Hi

没开启Checkpoint但是想知道状态存储的用量的话,对于FsStateBackend来说没有什么好办法;但是对于RocksDBStateBackend来说可以通过开启RocksDB
 native metrics [1] 的方式来观察memtable 以及 sst文件的 size,来近似估算整体状态存储数据量。


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#rocksdb-native-metrics

祝好
唐云

From: sunfulin 
Sent: Wednesday, January 8, 2020 17:43
To: user-zh@flink.apache.org 
Subject: flink算子状态查看

求问怎么通过dashboard查看状态存储量之类的统计?如果没开checkpoint的话


Re: How to verify if checkpoints are asynchronous or sync

2020-01-08 Thread RKandoji
I evaluated performance by looking at the number of input records processed
over 10min and 30mins duration.

Thanks,
R

On Wed, Jan 8, 2020 at 2:21 AM Congxian Qiu  wrote:

> If you want to figure out the performance problem, maybe async-profile[1]
> can be helpful
> [1] https://github.com/jvm-profiling-tools/async-profiler
> Best,
> Congxian
>
>
> William C  于2020年1月8日周三 上午11:37写道:
>
>> Hallo
>>
>> on 2020/1/8 11:31, RKandoji wrote:
>> > I'm running my job on a EC2 instance with 32 cores and according to the
>> > documentation I tried to use as many task slots the number of cores,
>> > numOfTaskSlots=32 and parallelism=32. But I noticed that
>> the performance
>> > is slightly degrading when I'm using 32 task slots. Performance seems
>> > better at 26 task slots than >26 task slots. So I was trying to
>> > understand if additional CPU cores are being utilized by checkpointing
>> > or any other async (or background operations, in the process I was
>> > trying to verify if the checkpointing is async.
>>
>> How did you evaluate the performance?
>> It's may due to busy IO or thread competition or something else similiar.
>> You'd better dig into more details via logging.
>>
>> Regards.
>>
>


Re: Using async io in cep

2020-01-08 Thread Dawid Wysakowicz
If I am not mistaken, my previous answer is still valid. There is no way
to have true asynchronicity within CEP conditions.


Why do you want to use async io there? Did you hit performance issues?
If so, you could try increasing the parallelism.


Best,

Dawid


On 07/01/2020 02:47, 郑 洁锋 wrote:
> Hi,
>         Our business is to dynamically query (such as left join/right
> join) the mysql / oracle database during the cep process, or is there
> any other way to achieve this function?
>
> 
> zjfpla...@hotmail.com
>
>  
> *From:* Dawid Wysakowicz 
> *Date:* 2020-01-06 18:09
> *To:* 郑 洁锋 ; user
> 
> *Subject:* Re: Using async io in cep
>
> Hi,
>
> You cannot use the Async IO as described here[1] in the CEP
> library, if that's what you are asking for.
>
> It is also not that straightforward to say what would an async
> processing in that case mean. Primary use case for Async IO is to
> execute parallel computations of independent data. In case of CEP
> it does not stand for processing of records within a single key,
> as those assume strict ordering and in general case depend on
> results of processing previous records. One could think of
> asynchronous processing of records from different keys in a single
> parallel instance, but that would require a careful key
> processing. If I am not mistaken Async IO also does not support a
> stateful processing on a keyed stream.
>
> Best,
>
> Dawid
>
> [1]
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/asyncio.html
>
> On 06/01/2020 09:59, 郑 洁锋 wrote:
>> Hi,
>>         Is there a way to use asynchronous io to query the
>> database in the process of cep?
>>
>> 
>> zjfpla...@hotmail.com
>>
>>


signature.asc
Description: OpenPGP digital signature


Re: Submit high version compiled code jar to low version flink cluster?

2020-01-08 Thread Arvid Heise
If you explicitly need features that are only present in Flink 1.9, chances
are high that your code will fail on older versions. If it's just about
syntactic sugar, a valid option is to copy the new functions in your code
and use that with the old version.

However, if you refer to SQL queries then your approach would be better as
it's not easy to cherry-pick specific parts. Then your only option is to
try it out.

Another orthogonal approach would be to bundle Flink as a library with your
application and run it dockerized. Then all applications can use their own
Flink version.

On Mon, Dec 30, 2019 at 12:08 PM Yun Tang  wrote:

> Hi Lei
>
> It's better to use the SAME version to submit job from client side. Even
> the major version of Flink is the same, the compatibility has not been
> declared to support. There exist a known issue due to some classes missing
> 'serialVersionUID'. [1]
>
> [1] https://issues.apache.org/jira/browse/FLINK-13910
>
> Best
> Yun Tang
> --
> *From:* tison 
> *Sent:* Monday, December 30, 2019 15:44
> *To:* wangl...@geekplus.com.cn 
> *Cc:* user 
> *Subject:* Re: Submit high version compiled code jar to low version flink
> cluster?
>
> It possibly fails with incompatibility. Flink doesn't promise such
> compatibility but it MIGHT work.
>
> Best,
> tison.
>
>
> wangl...@geekplus.com.cn  于2019年12月30日周一
> 下午3:17写道:
>
>
> The flink cluster version is 1.8.2
> The application source code needs some feature only supported in 1.9.1.
> So it is compiled with flink-1.9.1 denendency and builed to a fat jar with
> all the flink dependencies.
> What it will happen if I submit the high version builed jar to the low
> verion flink cluster?
>
> Thansk,
> Lei
>
>
>
>
>


Re: Flink Dataset to ParquetOutputFormat

2020-01-08 Thread Arvid Heise
Hi Anji,

StreamingFileSink has a BucketAssigner that you can use for that purpose.

>From the javadoc: The sink uses a BucketAssigner to determine in which
bucket directory each element should be written to inside the base
directory. The BucketAssigner can, for example, use time or a property of
the element to determine the bucket directory. The default BucketAssigner
is a DateTimeBucketAssigner which will create one new bucket every hour.
You can specify a custom BucketAssigner using the
setBucketAssigner(bucketAssigner) method, after calling forRowFormat(Path,
Encoder) or forBulkFormat(Path, BulkWriter.Factory).

If that doesn't work for you, please let me know. Btw, are you using event
or processing time?

Best,

Arvid

On Fri, Dec 27, 2019 at 4:24 AM vino yang  wrote:

> Hi Anji,
>
> Actually, I am not familiar with how to partition via timestamp. Flink's
> streaming BucketingSink provides this feature.[1] You may refer to this
> link and customize your sink.
>
> I can ping a professional committer who knows more detail of FS connector
> than me, @kklou...@gmail.com  may give you help.
>
> Best,
> Vino
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/filesystem_sink.html#bucketing-file-sink
>
> aj  于2019年12月27日周五 上午1:51写道:
>
>> Thanks Vino.
>>
>> I am able to write data in parquet now. But now the issue is how to write
>> a dataset to multiple output path as per timestamp partition.
>> I want to partition data on date wise.
>>
>> I am writing like this currently that will write to single output path.
>>
>> DataSet> df = allEvents.flatMap(new 
>> EventMapProcessor(schema.toString())).withParameters(configuration);
>>
>> Job job = Job.getInstance();
>> AvroParquetOutputFormat.setSchema(job, book_bike.getClassSchema());
>> HadoopOutputFormat parquetFormat = new HadoopOutputFormat> GenericRecord>(new AvroParquetOutputFormat(), job);
>> FileOutputFormat.setOutputPath(job, new Path(outputDirectory));
>>
>> df.output(parquetFormat);
>> env.execute();
>>
>>
>> Please suggest.
>>
>> Thanks,
>> Anuj
>>
>> On Mon, Dec 23, 2019 at 12:59 PM vino yang  wrote:
>>
>>> Hi Anuj,
>>>
>>> After searching in Github, I found a demo repository about how to use
>>> parquet in Flink.[1]
>>>
>>> You can have a look. I can not make sure whether it is helpful or not.
>>>
>>> [1]: https://github.com/FelixNeutatz/parquet-flinktacular
>>>
>>> Best,
>>> Vino
>>>
>>> aj  于2019年12月21日周六 下午7:03写道:
>>>
 Hello All,

 I am getting a set of events in JSON that I am dumping in the hourly
 bucket in S3.
 I am reading this hourly bucket and created a DataSet.

 I want to write this dataset as a parquet but I am not able to figure
 out. Can somebody help me with this?


 Thanks,
 Anuj


 

>>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> 
>>
>>
>> 
>>
>


Re: Get consumer group offset

2020-01-08 Thread Arvid Heise
Hi Alex,

seems like your message got lost during christmas.

I don't completely understand the question. Do you mean that Flink does not
pick up the consumer group anymore?

Btw out of curiosity, why are you still running Kafka 0.10? We are thinking
about dropping support for older Kafka versions as we expect most users to
use newer versions now.

Best,

Arvid

On Tue, Dec 24, 2019 at 10:09 AM qq <471237...@qq.com> wrote:

> Hi all,
>
>I use Kafka 0.10.0, Flink 1.9.0, why I can’t get flink consumer10 group
> which I had configured.  And I use KafkaConsumer not with Flink to consumer
> the same topic, I can get the consumer group metadata. Thanks.
>
> Kafka/bin/kafka-run-class kafka.admin.ConsumerGroupCommand
> --bootstrap-server t4:9092,t5:9092,t6:9092 --new-consumer --list|grep
> consumer_group_1
>
>
>   AlexFu
>


Re: Elasticsink sometimes gives NoClassDefFoundError

2020-01-08 Thread Arvid Heise
Hi Jayant,

if you only see it sometimes that indicates that you have it in two
different versions of the connectors where class loader order is
non-deterministic. Could you post the classpath?

Btw, it's always good to add which Flink version you use.

Best,

Arvid

On Wed, Jan 8, 2020 at 12:20 PM Jayant Ameta  wrote:

> Hi,
> I see the following error sometimes on my flink job, even though the class
> is present in my uber jar.
>
> java.lang.NoClassDefFoundError:
> org/apache/flink/streaming/connectors/elasticsearch5/shaded/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink$1
> at
> org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.connect(NioClientSocketPipelineSink.java:111)
> ... 17 common frames omitted Wrapped by:
> org.elasticsearch.ElasticsearchException: java.lang.NoClassDefFoundError:
> org/apache/flink/streaming/connectors/elasticsearch5/shaded/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink$1
> at
> org.elasticsearch.transport.netty3.Netty3Transport.exceptionCaught(Netty3Transport.java:325)
> at
> org.elasticsearch.transport.netty3.Netty3MessageChannelHandler.exceptionCaught(Netty3MessageChannelHandler.java:83)
> at
> org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112)
> at
> org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
> at
> org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
> ... 23 frames truncated
>
>
> Jayant
>


Re: Elasticsink sometimes gives NoClassDefFoundError

2020-01-08 Thread Chesnay Schepler
Could you clarify under what circumstances you see this issue? You say 
"sometimes"; is the job running normally and then failing due to this 
error? Does it happen when submitting/canceling/restarting a job etc.


On 08/01/2020 12:20, Jayant Ameta wrote:

Hi,
I see the following error sometimes on my flink job, even though the 
class is present in my uber jar.


java.lang.NoClassDefFoundError: 
org/apache/flink/streaming/connectors/elasticsearch5/shaded/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink$1 
at 
org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.connect(NioClientSocketPipelineSink.java:111) 
... 17 common frames omitted Wrapped by: 
org.elasticsearch.ElasticsearchException: 
java.lang.NoClassDefFoundError: 
org/apache/flink/streaming/connectors/elasticsearch5/shaded/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink$1 
at 
org.elasticsearch.transport.netty3.Netty3Transport.exceptionCaught(Netty3Transport.java:325) 
at 
org.elasticsearch.transport.netty3.Netty3MessageChannelHandler.exceptionCaught(Netty3MessageChannelHandler.java:83) 
at 
org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112) 
at 
org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) 
at 
org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) 
... 23 frames truncated



Jayant





Re: Flink group with time-windowed join

2020-01-08 Thread jeremyji
Hi Dawid,

I simplified my sql, the original sql is more complex and have a unnest
select like:

*SELECT
a.account,
(SUM(a.value) + SUM(b.value)) as result,
TUMBLE_START(a.producer_timestamp, INTERVAL '3' MINUTE)
FROM
(SELECT
account,
value,
producer_timestamp
FROM
table1) a,
(SELECT
account,
value,
producer_timestamp
FROM
table2,
unnest(table2.row_array) as T(account, value) b
WHERE
a.account = b.account AND
a.producer_timestamp BETWEEN b.producer_timestamp - INTERVAL '3'
MINUTE AND b.producer_timestamp
group by
a.account,
TUMBLE(a.producer_timestamp, INTERVAL '3' MINUTE)*

table2 has a column row_array which is a row array and each row has tow
fields: account and value.
producer_timestamp is time attribute, as a column of table2.
BTW, my Flink version is 1.7.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Table API: Joining on Tables of Complex Types

2020-01-08 Thread Dawid Wysakowicz
Hi Andreas,

Converting your GenericRecords to Rows would definitely be the safest
option. You can check how its done in the
org.apache.flink.formats.avro.AvroRowDeserializationSchema. You can
reuse the logic from there to write something like:

    DataSet dataset = ...

    dataset.map( /* convert GenericRecord to Row 
*/).returns(AvroSchemaConverter.convertToTypeInfo(avroSchemaString));

Another thing you could try is to make sure that GenericRecord is seen
as an avro type by fink (flink should understand that avro type is a
complex type):

    dataset.returns(new GenericRecordAvroTypeInfo(/*schema string*/)

than the TableEnvironment should pick it up as a structured type and
flatten it automatically when registering the Table. Bear in mind the
returns method is part of SingleInputUdfOperator so you can apply it
right after some transformation e.g. map/flatMap etc.

Best,

Dawid


On 06/01/2020 18:03, Hailu, Andreas wrote:
>
> Hi David, thanks for getting back.
>
>  
>
> From what you’ve said, I think we’ll need to convert our GenericRecord
> into structured types – do you have any references or examples I can
> have a look at? If not, perhaps you could just show me a basic example
> of flattening a complex object with accessors into a Table of
> structured types. Or by structured types, did you mean Row?
>
>  
>
> *// *ah**
>
>  
>
> *From:* Dawid Wysakowicz 
> *Sent:* Monday, January 6, 2020 9:32 AM
> *To:* Hailu, Andreas [Engineering] ;
> user@flink.apache.org
> *Cc:* Richards, Adam S [Engineering] 
> *Subject:* Re: Table API: Joining on Tables of Complex Types
>
>  
>
> Hi Andreas,
>
> First of all I would highly recommend converting a non-structured
> types to structured types as soon as possible as it opens more
> possibilities to optimize the plan.
>
> Have you tried:
>
> Table users =
> batchTableEnvironment.fromDataSet(usersDataset).select("getField(f0,
> userName) as userName", "f0")
> Table other =
> batchTableEnvironment.fromDataSet(otherDataset).select("getField(f0,
> userName) as user", "f1")
>
> Table result = other.join(users, "user = userName")
>
> You could also check how the
> org.apache.flink.formats.avro.AvroRowDeserializationSchema class is
> implemented which internally converts an avro record to a structured Row.
>
> Hope this helps.
>
> Best,
>
> Dawid
>
> On 03/01/2020 23:16, Hailu, Andreas wrote:
>
> Hi folks,
>
>  
>
> I’m trying to join two Tables which are composed of complex types,
> Avro’s GenericRecord to be exact. I have to use a custom UDF to
> extract fields out of the record and I’m having some trouble on
> how to do joins on them as I need to call this UDF to read what I
> need. Example below:
>
>  
>
> batchTableEnvironment.registerFunction("getField", new
> GRFieldExtractor()); // GenericRecord field extractor
>
> Table users = batchTableEnvironment.fromDataSet(usersDataset); //
> Converting from some pre-existing DataSet
>
> Table otherDataset =
> batchTableEnvironment.fromDataSet(someOtherDataset);
>
> Table userNames = t.select("getField(f0, userName)"); // This is
> how the UDF is used, as GenericRecord is a complex type requiring
> you to invoke a get() method on the field you’re interested in.
> Here we get a get on field ‘userName’
>
>  
>
> I’d like to do something using the Table API similar to the query
> “SELECT * from otherDataset WHERE otherDataset.userName =
> users.userName”. How is this done?
>
>  
>
> Best,
>
> Andreas
>
>  
>
> *The Goldman Sachs Group, Inc. All rights reserved*.
>
> See http://www.gs.com/disclaimer/global_email for important risk
> disclosures, conflicts of interest and other terms and conditions
> relating to this e-mail and your reliance on information contained
> in it.  This message may contain confidential or privileged
> information.  If you are not the intended recipient, please advise
> us immediately and delete this message.  See
> http://www.gs.com/disclaimer/email for further information on
> confidentiality and the risks of non-secure electronic
> communication.  If you cannot access these links, please notify us
> by reply message and we will send the contents to you.
>
>  
>
>  
>
> 
>
>
> Your Personal Data: We may collect and process information about
> you that may be subject to data protection laws. For more
> information about how we use and disclose your personal data, how
> we protect your information, our legal basis to use your
> information, your rights and who you can contact, please refer to:
> www.gs.com/privacy-notices 
>
>
> 
>
> Your Personal Data: We may collect and process information about you
> that may be subject to data protection 

Re: Flink logging issue with logback

2020-01-08 Thread Maximilian Michels
Interesting that we came across this problem at the same time. We have 
observed this with Lyft's K8s operator which uses the Rest API for job 
submission, much like the Flink dashboard.


Note that you can restore the original stdout/stderr in your program:

  private static void restoreStdOutAndStdErr() {
System.setOut(new PrintStream(
new FileOutputStream(FileDescriptor.out)));
System.setErr(new PrintStream(
new FileOutputStream(FileDescriptor.err)));
  }

Just call restoreStdOutAndStdErr() before you start building the Flink 
job. Of course, this is just meant to be a workaround.


I think an acceptable solution is to always print upon execution. For 
the plan preview we may keep the existing behavior.


Cheers,
Max

On 07.01.20 17:39, Dawid Wysakowicz wrote:
A quick update. The suppression of stdout/stderr actually might soon be 
dropped, see: https://issues.apache.org/jira/browse/FLINK-15504


Best,

Dawid

On 07/01/2020 07:17, Yang Wang wrote:

Hi Bajaj,

I have tested just as you say, and find that the logs in the user 
class could not show up when
using ConsoleAppender. If using FileAppender instead, everything goes 
well.


It is so weird and i have no idea how to debug it.
Best,
Yang

Bajaj, Abhinav > 于2020年1月7日周二 上午4:28写道:


Hi,

Thanks much for the responses.

Let me add some more details and clarify my question.

_Setup_

  * I used the WikipediaAnalysis example and added a log in main
method.

……

public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment see =
StreamExecutionEnvironment./getExecutionEnvironment/();
/LOG/.info("Info log for test");

DataStream edits = see.addSource(new
WikipediaEditsSource());

……

  * I am using the Flink 1.7.1 distribution and starting
jobmanager and taskmanager locally using the below commands –
  o ./bin/jobmanager.sh start-foreground
  o ./bin/taskmanager.sh start-foreground
  o Both jobmanager and taskmanager log in the console now
  o JVM options are correctly set and verified from jobmanager
& taskmanager logs

  * I submit the WikipediaAnalysis job from Flink dashboard and
checked the jobmanager logs

_Run 1_: Flink is using the default log4j logging

  * Jobmanager logs the added info log from the job
  o 2020-01-06 11:55:37,422 INFO wikiedits.WikipediaAnalysis -
Info log for test

_Run 2_: Flink is setup to use logback as suggested in Flink
documentation here



  * Jobmanger does not log the added info log from the job

So, it seems there is a logging behavior difference between using
log4j & logback in Flink.

Is this expected or a known difference?

Thanks again,

Abhinav Bajaj

_PS_: Ahh. I see how my email was confusing the first time.
Hopefully this one is better :P

*From: *Dawid Wysakowicz mailto:dwysakow...@apache.org>>
*Date: *Monday, January 6, 2020 at 5:13 AM
*Cc: *"Bajaj, Abhinav" mailto:abhinav.ba...@here.com>>, "user@flink.apache.org
" mailto:user@flink.apache.org>>
*Subject: *Re: Flink logging issue with logback

Hi Bajaj,

I am not entirely sure what is the actual issue you are seeking
help, but let me comment on your observations.

Ad. 1

If you log to the console from the main method this is an expected
behavior in both cases (log4j, logback). The std out is being
overwritten for the execution of the main method

If you log to a file logs should work in both cases. I checked
that myself and actually the logs appeared in the jobmanager logs
as long as they are executed before the env.execute(). I observed
though weird behavior of the Web UI, as it does not always update
the logs that are displayed. How did you check the logs? If you
checked through the Web UI could you try to check the file directly?

Ad. 2 Yes this is expected. Operators are executed on taskmanager
and that's why they log there.

Ad. 3 See Ad. 1

Best,

Dawid

On 06/01/2020 07:07, vino yang wrote:

Hi Bajaj,

>> Logs from main method(outside of job graph) do not show up
in jobmanager logs.

IMO, it's normal phenomena.

Other ideas, please check the JVM options mentioned by Yang.

Best,

Vino

Yang Wang mailto:danrtsey...@gmail.com>> 于2020年1月6日周一 上午11:18写道:

Hi Bajaj, Abhinav,

Could you share the start-command of jobmanager and
taskmanager. If it is started correctly, we

will have a the following jvm options.

-Dlog.file=/path/of/taskmanager.log
-Dlogback.configurationFile=file:///path/of/logback.xml

   

Elasticsink sometimes gives NoClassDefFoundError

2020-01-08 Thread Jayant Ameta
Hi,
I see the following error sometimes on my flink job, even though the class
is present in my uber jar.

java.lang.NoClassDefFoundError:
org/apache/flink/streaming/connectors/elasticsearch5/shaded/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink$1
at
org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.connect(NioClientSocketPipelineSink.java:111)
... 17 common frames omitted Wrapped by:
org.elasticsearch.ElasticsearchException: java.lang.NoClassDefFoundError:
org/apache/flink/streaming/connectors/elasticsearch5/shaded/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink$1
at
org.elasticsearch.transport.netty3.Netty3Transport.exceptionCaught(Netty3Transport.java:325)
at
org.elasticsearch.transport.netty3.Netty3MessageChannelHandler.exceptionCaught(Netty3MessageChannelHandler.java:83)
at
org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112)
at
org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at
org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
... 23 frames truncated


Jayant


Re: Re: Flink SQL Count Distinct performance optimization

2020-01-08 Thread Benchao Li
hi sunfulin,

As Kurt pointed out, if you use RocksDB state backend, maybe slow disk IO
bound your job.
You can check WindowOperator's latency metric to see how long it tasks to
process an element.
Hope this helps.

sunfulin  于2020年1月8日周三 下午4:04写道:

> Ah, I had checked resource usage and GC from flink dashboard. Seem that
> the reason is not cpu or memory issue. Task heap memory usage is less then
> 30%. Could you kindly tell that how I can see more metrics to help target
> the bottleneck?
> Really appreciated that.
>
>
>
>
>
> At 2020-01-08 15:59:17, "Kurt Young"  wrote:
>
> Hi,
>
> Could you try to find out what's the bottleneck of your current job? This
> would leads to
> different optimizations. Such as whether it's CPU bounded, or you have too
> big local
> state thus stuck by too many slow IOs.
>
> Best,
> Kurt
>
>
> On Wed, Jan 8, 2020 at 3:53 PM 贺小令  wrote:
>
>> hi sunfulin,
>> you can try with blink planner (since 1.9 +), which optimizes distinct
>> aggregation. you can also try to enable
>> *table.optimizer.distinct-agg.split.enabled* if the data is skew.
>>
>> best,
>> godfreyhe
>>
>> sunfulin  于2020年1月8日周三 下午3:39写道:
>>
>>> Hi, community,
>>> I'm using Apache Flink SQL to build some of my realtime streaming apps.
>>> With one scenario I'm trying to count(distinct deviceID) over about 100GB
>>> data set in realtime, and aggregate results with sink to ElasticSearch
>>> index. I met a severe performance issue when running my flink job. Wanner
>>> get some help from community.
>>>
>>>
>>> Flink version : 1.8.2
>>> Running on yarn with 4 yarn slots per task manager. My flink task
>>> parallelism is set to be 10, which is equal to my kafka source partitions.
>>> After running the job, I can observe high backpressure from the flink
>>> dashboard. Any suggestions and kind of help is highly appreciated.
>>>
>>>
>>> running sql is like the following:
>>>
>>>
>>> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>>>
>>> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
>>> clkCnt  from
>>>
>>> (
>>>
>>> SELECT
>>>
>>>  aggId,
>>>
>>>  pageId,
>>>
>>>  statkey,
>>>
>>>  COUNT(DISTINCT deviceId) as cnt
>>>
>>>  FROM
>>>
>>>  (
>>>
>>>  SELECT
>>>
>>>  'ZL_005' as aggId,
>>>
>>>  'ZL_UV_PER_MINUTE' as pageId,
>>>
>>>  deviceId,
>>>
>>>  ts2Date(recvTime) as statkey
>>>
>>>  from
>>>
>>>  kafka_zl_etrack_event_stream
>>>
>>>  )
>>>
>>>  GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>>>
>>> ) as t1
>>>
>>> group by aggId, pageId, statkey
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Best
>>
>>
>
>
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Re: Flink SQL Count Distinct performance optimization

2020-01-08 Thread Benchao Li
hi sunfulin,

As Kurt pointed out, if you use RocksDB state backend, maybe slow disk IO
bound your job.
You can check WindowOperator's latency metric to see how long it tasks to
process an element.
Hope this helps.

sunfulin  于2020年1月8日周三 下午4:04写道:

> Ah, I had checked resource usage and GC from flink dashboard. Seem that
> the reason is not cpu or memory issue. Task heap memory usage is less then
> 30%. Could you kindly tell that how I can see more metrics to help target
> the bottleneck?
> Really appreciated that.
>
>
>
>
>
> At 2020-01-08 15:59:17, "Kurt Young"  wrote:
>
> Hi,
>
> Could you try to find out what's the bottleneck of your current job? This
> would leads to
> different optimizations. Such as whether it's CPU bounded, or you have too
> big local
> state thus stuck by too many slow IOs.
>
> Best,
> Kurt
>
>
> On Wed, Jan 8, 2020 at 3:53 PM 贺小令  wrote:
>
>> hi sunfulin,
>> you can try with blink planner (since 1.9 +), which optimizes distinct
>> aggregation. you can also try to enable
>> *table.optimizer.distinct-agg.split.enabled* if the data is skew.
>>
>> best,
>> godfreyhe
>>
>> sunfulin  于2020年1月8日周三 下午3:39写道:
>>
>>> Hi, community,
>>> I'm using Apache Flink SQL to build some of my realtime streaming apps.
>>> With one scenario I'm trying to count(distinct deviceID) over about 100GB
>>> data set in realtime, and aggregate results with sink to ElasticSearch
>>> index. I met a severe performance issue when running my flink job. Wanner
>>> get some help from community.
>>>
>>>
>>> Flink version : 1.8.2
>>> Running on yarn with 4 yarn slots per task manager. My flink task
>>> parallelism is set to be 10, which is equal to my kafka source partitions.
>>> After running the job, I can observe high backpressure from the flink
>>> dashboard. Any suggestions and kind of help is highly appreciated.
>>>
>>>
>>> running sql is like the following:
>>>
>>>
>>> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>>>
>>> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
>>> clkCnt  from
>>>
>>> (
>>>
>>> SELECT
>>>
>>>  aggId,
>>>
>>>  pageId,
>>>
>>>  statkey,
>>>
>>>  COUNT(DISTINCT deviceId) as cnt
>>>
>>>  FROM
>>>
>>>  (
>>>
>>>  SELECT
>>>
>>>  'ZL_005' as aggId,
>>>
>>>  'ZL_UV_PER_MINUTE' as pageId,
>>>
>>>  deviceId,
>>>
>>>  ts2Date(recvTime) as statkey
>>>
>>>  from
>>>
>>>  kafka_zl_etrack_event_stream
>>>
>>>  )
>>>
>>>  GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>>>
>>> ) as t1
>>>
>>> group by aggId, pageId, statkey
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Best
>>
>>
>
>
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: kafka: how to stop consumption temporarily

2020-01-08 Thread Arvid Heise
I'd second Chesnay's suggestion to use a custom source. It would be a piece
of cake with FLIP-27 [1], but we are not there yet unfortunately. It's
probably in Flink 1.11 (mid year) if you can wait.

The current way would be a source that wraps the two KafkaConsumer and
blocks the normal consumer from outputting elements. Here is a quick and
dirty solution that I threw together:
https://gist.github.com/AHeise/d7a8662f091e5a135c5ccfd6630634dd .

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface

On Mon, Jan 6, 2020 at 1:16 PM David Morin 
wrote:

> My naive solution can't work because a dump can be quite long.
> So, yes I have to find a way to stop the consumption from the topic used
> for streaming mode when a dump is done :(
> Terry, I try to implement something based on your reply and based on this
> thread
> https://stackoverflow.com/questions/59201503/flink-kafka-gracefully-close-flink-consuming-messages-from-kafka-source-after-a
> Any suggestions are welcomed
> thx.
>
> David
>
> On 2020/01/06 09:35:37, David Morin  wrote:
> > Hi,
> >
> > Thanks for your replies.
> > Yes Terry. You are right. I can try to create a custom source.
> > But perhaps, according to my use case, I figured out I can use a
> technical field in my data. This is a timestamp and I think I just have to
> ignore late events with watermarks or later in the pipeline according to
> metadata stored in the Flink state. I test it now...
> > Thx
> >
> > David
> >
> > On 2020/01/03 15:44:08, Chesnay Schepler  wrote:
> > > Are you asking how to detect from within the job whether the dump is
> > > complete, or how to combine these 2 jobs?
> > >
> > > If you had a way to notice whether the dump is complete, then I would
> > > suggest to create a custom source that wraps 2 kafka sources, and
> switch
> > > between them at will based on your conditions.
> > >
> > >
> > > On 03/01/2020 03:53, Terry Wang wrote:
> > > > Hi,
> > > >
> > > > I’d like to share my opinion here. It seems that you need adjust the
> Kafka consumer to have communication each other. When your begin the dump
> process, you need to notify another CDC-topic consumer to wait idle.
> > > >
> > > >
> > > > Best,
> > > > Terry Wang
> > > >
> > > >
> > > >
> > > >> 2020年1月2日 16:49,David Morin  写道:
> > > >>
> > > >> Hi,
> > > >>
> > > >> Is there a way to stop temporarily to consume one kafka source in
> streaming mode ?
> > > >> Use case: I have to consume 2 topics but in fact one of them is
> more prioritized.
> > > >> One of this topic is dedicated to ingest data from db (change data
> capture) and one of them is dedicated to make a synchronization (a dump
> i.e. a SELECT ... from db). At the moment the last one is performed by one
> Flink job and we start this one after stop the previous one (CDC) manually
> > > >> I want to merge these 2 modes and automatically stop consumption of
> the topic dedicated to the CDC mode when a dump is done.
> > > >> How to handle that with Flink in a streaming way ? backpressure ?
> ...
> > > >> Thx in advance for your insights
> > > >>
> > > >> David
> > > >
> > >
> > >
> >
>


Re: Migrate custom partitioner from Flink 1.7 to Flink 1.9

2020-01-08 Thread Arvid Heise
Hi Salva,

I already answered on SO [1], but I'll replicate it here:

With Flink 1.9, you cannot dynamically broadcast to all channels anymore.
Your StreamPartitioner has to statically specify if it's a broadcast with
isBroadcast. Then, selectChannel is never invoked.

Do you have a specific use case, where you'd need to dynamically switch?

Best,

Arvid

[1]
https://stackoverflow.com/questions/59485064/migrating-custom-dynamic-partitioner-from-flink-1-7-to-flink-1-9

On Sat, Jan 4, 2020 at 7:00 AM Salva Alcántara 
wrote:

> Thanks Chesnay! Just to be clear, this how my current code looks like:
>
> ```
> unionChannel = broadcastChannel.broadcast().union(singleChannel)
>
> result = new DataStream<>(
> unionChannel.getExecutionEnvironment(),
> new PartitionTransformation<>(unionChannel.getTransformation(), new
> MyDynamicPartitioner())
> )
> ```
>
> The problem when migrating to Flink 1.9 is that MyDynamicPartitioner cannot
> handle broadcasted elements as explained in the question description. So,
> based on your reply, I guess I could do something like this:
>
> ```
> resultSingleChannel = new DataStream<>(
> singleChannel.getExecutionEnvironment(),
> new PartitionTransformation<>(singleChannel.getTransformation(), new
> MyDynamicPartitioner())
> )
>
> result = broadcastChannel.broadcast().union(resultSingleChannel)
> ```
>
> I will give it a try and see if it works.
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Session Window with dynamic gap

2020-01-08 Thread Aljoscha Krettek
Hi Kristoff,

There are no plans of adding state support to the gap extractors but you could 
do this using a two-step approach, i.e. have an operation in front of the 
window that keeps track of session gaps, enriches the message with the gap that 
should be used and then the extractor extracts that gap. This is a more modular 
approach compared to putting everything in one operator/extractor.

Best,
Aljoscha

> On 3. Jan 2020, at 08:52, vino yang  wrote:
> 
> Hi KristoffSC,
> 
> >> Are there any plans to add support of Flink State into 
> >> SessionWindowTimeGapExtractor?
> 
> As I said, `SessionWindowTimeGapExtractor` is neither a general UDF nor an 
> operator.
> 
> But I cannot give a clear answer. Let me ping @Aljoscha Krettek  to give the 
> answer.
> 
> Best,
> Vino
> 
> KristoffSC  于2020年1月3日周五 上午6:17写道:
> Ok, 
> I did some more tests and yep, it seems that there is no way to use Flink's
> State in class that will implement SessionWindowTimeGapExtractor. 
> 
> Even if I will implement this interface on a class that is an operator,
> whenever extract method is called it does not have any access to Flink's
> state. Even calling getRuntimeContext() from it throws an exception.
> 
> Are there any plans to add support of Flink State into
> SessionWindowTimeGapExtractor?
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Late outputs for Session Window

2020-01-08 Thread KristoffSC
Hi, 
thank you for your SO comment [1]. You are right. Sorry, I miss understand
the "late message" concepts. 
In fact I was never sending "late events" that should match just ended
window.

Thank you for your comments and clarification. 


[1]
https://stackoverflow.com/questions/59570445/late-outputs-missing-for-flinks-session-window/59642942#59642942



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


flink算子状态查看

2020-01-08 Thread sunfulin
求问怎么通过dashboard查看状态存储量之类的统计?如果没开checkpoint的话

Re: 疑似ParquetTableSource Filter Pushdown bug

2020-01-08 Thread Kurt Young
如果是优化器一直卡住不能退出,那基本肯定是BUG了。请开一个issue把这些信息上传上去吧,我们会调查一下是什么问题导致的。

Best,
Kurt


On Wed, Jan 8, 2020 at 5:12 PM jun su  wrote:

> 添加代码文字:
>
> def main(args: Array[String]): Unit = {
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tableEnv = StreamTableEnvironment.create(env)
>
> val schema = 
> "{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"log_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"log_from\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"data_source\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"is_scan\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"result\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timelong\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"is_sec\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"time_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"device\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"occur_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"row_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"
> val parquetTableSource: ParquetTableSource = ParquetTableSource
> .builder
> .forParquetSchema(new 
> org.apache.parquet.avro.AvroSchemaConverter().convert(
> org.apache.avro.Schema.parse(schema, true)))
> .path("/Users/sujun/Documents/tmp/login_data")
> .build
>
> tableEnv.registerTableSource("source",parquetTableSource)
>
>
> val t1 = tableEnv.sqlQuery("select log_id,city from source where city = 
> '274' ")
> tableEnv.registerTable("t1",t1)
>
> val t4 = tableEnv.sqlQuery("select * from t1 where 
> log_id='5927070661978133'")
> t1.toAppendStream[Row].print()
>
> env.execute()
>
> }
>
>
> jun su  于2020年1月8日周三 下午4:59写道:
>
>> 你好:
>>我在使用ParquetTableSource时, 发现一些问题, 疑似是ParquetTableSource Filter
>> Pushdown的Bug, 以下是代码和描述:
>>
>> [image: 1578473593933.jpg]
>>
>> debug发现,
>> 代码卡在了: org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp方法, while
>> true循环一直出不来, 知道整合程序OOM
>>
>> [image: 1.jpg]
>>
>> 将ParquetTableSource的filter pushdown代码去掉后 , 主程序可以执行.
>> 怀疑是calcite的优化器在迭代找代价最小的plan时一直无法退出导致的
>>
>


Re: Late outputs for Session Window

2020-01-08 Thread Arvid Heise
Hi Kristoff,

please check my SO comment and reply.

https://stackoverflow.com/questions/59570445/late-outputs-missing-for-flinks-session-window/59642942#59642942

It's not entirely clear to me why it's not working but I also don't quite
understand your use case yet (data examples missing).

Best,

Arvid

On Fri, Jan 3, 2020 at 1:03 PM KristoffSC 
wrote:

> After following suggestion from SO
> I added few changes, so now I'm using Event Time
> Water marks are progressing, I've checked them in Flink's metrics. The
> Window operator is triggered but still I don't see any late outputs for
> this.
>
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000,
> 1000));
> env.setParallelism(1);
> env.disableOperatorChaining();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.getConfig().setAutoWatermarkInterval(1000);
>
>
> DataStream rawBusinessTransaction = env
> .addSource(new FlinkKafkaConsumer<>("business",
> new JSONKeyValueDeserializationSchema(false),
> properties))
> .map(new KafkaTransactionObjectMapOperator())
> .assignTimestampsAndWatermarks(new
> AssignerWithPeriodicWatermarks() {
>
> @Nullable
> @Override
> public Watermark getCurrentWatermark() {
> return new Watermark(System.currentTimeMillis());
> }
>
> @Override
> public long extractTimestamp(RawMessage element, long
> previousElementTimestamp) {
> return element.messageCreationTime;
> }
> })
> .name("Kafka Transaction Raw Data Source.");
>
> messageStream
>  .keyBy(tradeKeySelector)
>  .window(EventTimeSessionWindows.withDynamicGap(new
> TradeAggregationGapExtractor()))
>  .sideOutputLateData(lateTradeMessages)
>  .process(new CumulativeTransactionOperator())
>  .name("Aggregate Transaction Builder");
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: 疑似ParquetTableSource Filter Pushdown bug

2020-01-08 Thread jun su
添加代码文字:

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tableEnv = StreamTableEnvironment.create(env)

val schema =
"{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"log_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"log_from\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"data_source\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"is_scan\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"result\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timelong\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"is_sec\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"time_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"device\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"occur_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"row_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"
val parquetTableSource: ParquetTableSource = ParquetTableSource
.builder
.forParquetSchema(new
org.apache.parquet.avro.AvroSchemaConverter().convert(
org.apache.avro.Schema.parse(schema, true)))
.path("/Users/sujun/Documents/tmp/login_data")
.build

tableEnv.registerTableSource("source",parquetTableSource)


val t1 = tableEnv.sqlQuery("select log_id,city from source where
city = '274' ")
tableEnv.registerTable("t1",t1)

val t4 = tableEnv.sqlQuery("select * from t1 where
log_id='5927070661978133'")
t1.toAppendStream[Row].print()

env.execute()

}


jun su  于2020年1月8日周三 下午4:59写道:

> 你好:
>我在使用ParquetTableSource时, 发现一些问题, 疑似是ParquetTableSource Filter
> Pushdown的Bug, 以下是代码和描述:
>
> [image: 1578473593933.jpg]
>
> debug发现,
> 代码卡在了: org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp方法, while
> true循环一直出不来, 知道整合程序OOM
>
> [image: 1.jpg]
>
> 将ParquetTableSource的filter pushdown代码去掉后 , 主程序可以执行.
> 怀疑是calcite的优化器在迭代找代价最小的plan时一直无法退出导致的
>


Re: Using redis cache in flink

2020-01-08 Thread Yun Tang
Hi Navneeth

If you need the redis cache to be fault tolerant, I am afraid you have to 
choose redis cluster since Flink might deploy task on another node which is 
different from previous node after job failover.

If you don't care about the fault tolerance, you could implement a customized 
operator which launch redis.

By the way, there existed a way to combine objects on heap in memory with 
checkpoint mechanism to ensure fault tolerance, you could refer to [1] and [2]. 
The basic idea is to cac

[1] 
https://github.com/apache/flink/blob/9df5c80e7e729f49595ef6814462165831fd1307/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.java#L147
[2] 
https://github.com/apache/flink/blob/9df5c80e7e729f49595ef6814462165831fd1307/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchLocalGroupAggFunction.java#L89



From: Navneeth Krishnan 
Sent: Wednesday, January 8, 2020 15:36
To: Yun Tang 
Cc: user 
Subject: Re: Using redis cache in flink

Hi Yun,

Thanks, the way I want to use redis is like a cache not as state backend. I 
would still have rocksdb state backend for other states. The reason to use 
cache instead of managed state is because I’d get around 10k msgs per task slot 
and I don’t have to get the state from rocksdb for each lookup. In memory cache 
would be fine but to rebuild the state I want to use redis.

Regards

On Tue, Jan 7, 2020 at 11:21 PM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Navneeth

If you wrap redis as a state backend, you cannot easily share data across slots 
as Flink construct state backend per operator with local thread only.

If you use a redis cluster as a externalized service to store your data, you 
can share data across slots easily. However, compared with the reduced cost of 
serialization, the introduce of network communicate cannot be ignored. There 
exists trade-off here, and we cannot ensure there would be a performance gain. 
Actually, I prefer the time used in CPU serialization is much less than the 
time consumed through the network.

Best
Yun Tang

From: Navneeth Krishnan 
mailto:reachnavnee...@gmail.com>>
Sent: Wednesday, January 8, 2020 12:33
To: user mailto:user@flink.apache.org>>
Subject: Using redis cache in flink

Hi All,

I want to use redis as near far cache to store data which are common across 
slots i.e. share data across slots. This data is required for processing every 
single message and it's better to store in a in memory cache backed by redis 
rather than rocksdb since it has to be serialized for every single get call. Do 
you guys think this is good solution or is there any other better solution? 
Also, Is there any reference on how I can create a centralized near far cache 
since the job and operators are distributed by the job manager.

Thanks


Re:Re: Flink SQL Count Distinct performance optimization

2020-01-08 Thread sunfulin
Ah, I had checked resource usage and GC from flink dashboard. Seem that the 
reason is not cpu or memory issue. Task heap memory usage is less then 30%. 
Could you kindly tell that how I can see more metrics to help target the 
bottleneck? 
Really appreciated that.








At 2020-01-08 15:59:17, "Kurt Young"  wrote:

Hi,


Could you try to find out what's the bottleneck of your current job? This would 
leads to 
different optimizations. Such as whether it's CPU bounded, or you have too big 
local 
state thus stuck by too many slow IOs. 


Best,

Kurt





On Wed, Jan 8, 2020 at 3:53 PM 贺小令  wrote:

hi sunfulin,  
you can try with blink planner (since 1.9 +), which optimizes distinct 
aggregation. you can also try to enable 
table.optimizer.distinct-agg.split.enabled if the data is skew.


best,
godfreyhe


sunfulin  于2020年1月8日周三 下午3:39写道:

Hi, community,
I'm using Apache Flink SQL to build some of my realtime streaming apps. With 
one scenario I'm trying to count(distinct deviceID) over about 100GB data set 
in realtime, and aggregate results with sink to ElasticSearch index. I met a 
severe performance issue when running my flink job. Wanner get some help from 
community.


Flink version : 1.8.2
Running on yarn with 4 yarn slots per task manager. My flink task parallelism 
is set to be 10, which is equal to my kafka source partitions. After running 
the job, I can observe high backpressure from the flink dashboard. Any 
suggestions and kind of help is highly appreciated.


running sql is like the following:


INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)

select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as clkCnt  
from

(

SELECT

 aggId,

 pageId,

 statkey,

 COUNT(DISTINCT deviceId) as cnt

 FROM

 (

 SELECT

 'ZL_005' as aggId,

 'ZL_UV_PER_MINUTE' as pageId,

 deviceId,

 ts2Date(recvTime) as statkey

 from

 kafka_zl_etrack_event_stream

 )

 GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)

) as t1

group by aggId, pageId, statkey
















Best

Re:Re: Flink SQL Count Distinct performance optimization

2020-01-08 Thread sunfulin
Ah, I had checked resource usage and GC from flink dashboard. Seem that the 
reason is not cpu or memory issue. Task heap memory usage is less then 30%. 
Could you kindly tell that how I can see more metrics to help target the 
bottleneck? 
Really appreciated that.








At 2020-01-08 15:59:17, "Kurt Young"  wrote:

Hi,


Could you try to find out what's the bottleneck of your current job? This would 
leads to 
different optimizations. Such as whether it's CPU bounded, or you have too big 
local 
state thus stuck by too many slow IOs. 


Best,

Kurt





On Wed, Jan 8, 2020 at 3:53 PM 贺小令  wrote:

hi sunfulin,  
you can try with blink planner (since 1.9 +), which optimizes distinct 
aggregation. you can also try to enable 
table.optimizer.distinct-agg.split.enabled if the data is skew.


best,
godfreyhe


sunfulin  于2020年1月8日周三 下午3:39写道:

Hi, community,
I'm using Apache Flink SQL to build some of my realtime streaming apps. With 
one scenario I'm trying to count(distinct deviceID) over about 100GB data set 
in realtime, and aggregate results with sink to ElasticSearch index. I met a 
severe performance issue when running my flink job. Wanner get some help from 
community.


Flink version : 1.8.2
Running on yarn with 4 yarn slots per task manager. My flink task parallelism 
is set to be 10, which is equal to my kafka source partitions. After running 
the job, I can observe high backpressure from the flink dashboard. Any 
suggestions and kind of help is highly appreciated.


running sql is like the following:


INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)

select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as clkCnt  
from

(

SELECT

 aggId,

 pageId,

 statkey,

 COUNT(DISTINCT deviceId) as cnt

 FROM

 (

 SELECT

 'ZL_005' as aggId,

 'ZL_UV_PER_MINUTE' as pageId,

 deviceId,

 ts2Date(recvTime) as statkey

 from

 kafka_zl_etrack_event_stream

 )

 GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)

) as t1

group by aggId, pageId, statkey
















Best

Re:Re: Flink SQL Count Distinct performance optimization

2020-01-08 Thread sunfulin


hi,godfreyhe
As far as I can see, I rewrite the running sql from one count distinct level to 
2 level agg, just as the table.optimizer.distinct-agg.split.enabled param 
worked. Correct me if I am telling the wrong way. But the rewrite sql does not 
work well for the performance throughout. 
For now I am not able to use blink planner on my apps because the current prod 
environment has not planned or ready to up to Flink 1.9+. 







At 2020-01-08 15:52:28, "贺小令"  wrote:

hi sunfulin,  
you can try with blink planner (since 1.9 +), which optimizes distinct 
aggregation. you can also try to enable 
table.optimizer.distinct-agg.split.enabled if the data is skew.


best,
godfreyhe


sunfulin  于2020年1月8日周三 下午3:39写道:

Hi, community,
I'm using Apache Flink SQL to build some of my realtime streaming apps. With 
one scenario I'm trying to count(distinct deviceID) over about 100GB data set 
in realtime, and aggregate results with sink to ElasticSearch index. I met a 
severe performance issue when running my flink job. Wanner get some help from 
community.


Flink version : 1.8.2
Running on yarn with 4 yarn slots per task manager. My flink task parallelism 
is set to be 10, which is equal to my kafka source partitions. After running 
the job, I can observe high backpressure from the flink dashboard. Any 
suggestions and kind of help is highly appreciated.


running sql is like the following:


INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)

select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as clkCnt  
from

(

SELECT

 aggId,

 pageId,

 statkey,

 COUNT(DISTINCT deviceId) as cnt

 FROM

 (

 SELECT

 'ZL_005' as aggId,

 'ZL_UV_PER_MINUTE' as pageId,

 deviceId,

 ts2Date(recvTime) as statkey

 from

 kafka_zl_etrack_event_stream

 )

 GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)

) as t1

group by aggId, pageId, statkey
















Best

Re:Re: Flink SQL Count Distinct performance optimization

2020-01-08 Thread sunfulin


hi,godfreyhe
As far as I can see, I rewrite the running sql from one count distinct level to 
2 level agg, just as the table.optimizer.distinct-agg.split.enabled param 
worked. Correct me if I am telling the wrong way. But the rewrite sql does not 
work well for the performance throughout. 
For now I am not able to use blink planner on my apps because the current prod 
environment has not planned or ready to up to Flink 1.9+. 







At 2020-01-08 15:52:28, "贺小令"  wrote:

hi sunfulin,  
you can try with blink planner (since 1.9 +), which optimizes distinct 
aggregation. you can also try to enable 
table.optimizer.distinct-agg.split.enabled if the data is skew.


best,
godfreyhe


sunfulin  于2020年1月8日周三 下午3:39写道:

Hi, community,
I'm using Apache Flink SQL to build some of my realtime streaming apps. With 
one scenario I'm trying to count(distinct deviceID) over about 100GB data set 
in realtime, and aggregate results with sink to ElasticSearch index. I met a 
severe performance issue when running my flink job. Wanner get some help from 
community.


Flink version : 1.8.2
Running on yarn with 4 yarn slots per task manager. My flink task parallelism 
is set to be 10, which is equal to my kafka source partitions. After running 
the job, I can observe high backpressure from the flink dashboard. Any 
suggestions and kind of help is highly appreciated.


running sql is like the following:


INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)

select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as clkCnt  
from

(

SELECT

 aggId,

 pageId,

 statkey,

 COUNT(DISTINCT deviceId) as cnt

 FROM

 (

 SELECT

 'ZL_005' as aggId,

 'ZL_UV_PER_MINUTE' as pageId,

 deviceId,

 ts2Date(recvTime) as statkey

 from

 kafka_zl_etrack_event_stream

 )

 GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)

) as t1

group by aggId, pageId, statkey
















Best