RE: kafka partitions, data locality

2019-04-29 Thread Smirnov Sergey Vladimirovich (39833)
Hi Stefan,

Thnx for clarify!
But still it remains an open question for me because we use keyBy method and I 
did not found any public interface of keys reassignment (smth like 
partionCustom for DataStream).
As I heard, there is some internal mechanism with key groups and mapping key to 
groups. Is it supposed to become public?


Regards,
Sergey

From: Stefan Richter [mailto:s.rich...@ververica.com]
Sent: Friday, April 26, 2019 11:15 AM
To: Smirnov Sergey Vladimirovich (39833) 
Cc: Dawid Wysakowicz ; Ken Krugler 
; user@flink.apache.org; d...@flink.apache.org
Subject: Re: kafka partitions, data locality

Hi Sergey,

The point why this I flagged as beta is actually less about stability but more 
about the fact that this is supposed to be more of a "power user" feature 
because bad things can happen if your data is not 100% correctly partitioned in 
the same way as Flink would partition it. This is why typically you should only 
use it if the data was partitioned by Flink and you are very sure what your are 
doing, because the is not really something we can to at the API level to 
protect you from mistakes in using this feature. Eventually some runtime 
exceptions might show you that something is going wrong, but that is not 
exactly a good user experience.

On a different note, there actually is currently one open issue [1] to be aware 
of in connection with this feature and operator chaining, but at the same time 
this is something that should not hard to fix in for the next minor release.

Best,
Stefan

[1] 
https://issues.apache.org/jira/browse/FLINK-12296?focusedCommentId=16824945=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16824945


On 26. Apr 2019, at 09:48, Smirnov Sergey Vladimirovich (39833) 
mailto:s.smirn...@tinkoff.ru>> wrote:

Hi,

Dawid, great, thanks!
Any plans to make it stable? 1.9?


Regards,
Sergey

From: Dawid Wysakowicz [mailto:dwysakow...@apache.org]
Sent: Thursday, April 25, 2019 10:54 AM
To: Smirnov Sergey Vladimirovich (39833) 
mailto:s.smirn...@tinkoff.ru>>; Ken Krugler 
mailto:kkrugler_li...@transpac.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>; 
d...@flink.apache.org<mailto:d...@flink.apache.org>
Subject: Re: kafka partitions, data locality

Hi Smirnov,
Actually there is a way to tell Flink that data is already partitioned. You can 
try the reinterpretAsKeyedStream[1] method. I must warn you though this is an 
experimental feature.
Best,
Dawid
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/experimental.html#experimental-features
On 19/04/2019 11:48, Smirnov Sergey Vladimirovich (39833) wrote:
Hi Ken,

It’s a bad story for us: even for a small window we have a dozens of thousands 
events per job with 10x in peaks or even more. And the number of jobs was known 
to be high. So instead of N operations (our producer/consumer mechanism) with 
shuffle/resorting (current flink realization) it will be N*ln(N) - the tenfold 
loss of execution speed!
4 all, my next step? Contribute to apache flink? Issues backlog?


With best regards,
Sergey
From: Ken Krugler [mailto:kkrugler_li...@transpac.com]
Sent: Wednesday, April 17, 2019 9:23 PM
To: Smirnov Sergey Vladimirovich (39833) 
<mailto:s.smirn...@tinkoff.ru>
Subject: Re: kafka partitions, data locality

Hi Sergey,

As you surmised, once you do a keyBy/max on the Kafka topic, to group by 
clientId and find the max, then the topology will have a partition/shuffle to 
it.

This is because Flink doesn’t know that client ids don’t span Kafka partitions.

I don’t know of any way to tell Flink that the data doesn’t need to be 
shuffled. There was a 
discussion<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-keying-sub-keying-a-stream-without-repartitioning-td12745.html>
 about adding a keyByWithoutPartitioning a while back, but I don’t think that 
support was ever added.

A simple ProcessFunction with MapState (clientId -> max) should allow you do to 
the same thing without too much custom code. In order to support windowing, 
you’d use triggers to flush state/emit results.

— Ken


On Apr 17, 2019, at 2:33 AM, Smirnov Sergey Vladimirovich (39833) 
mailto:s.smirn...@tinkoff.ru>> wrote:

Hello,

We planning to use apache flink as a core component of our new streaming system 
for internal processes (finance, banking business) based on apache kafka.
So we starting some research with apache flink and one of the question, arises 
during that work, is how flink handle with data locality.
I`ll try to explain: suppose we have a kafka topic with some kind of events. 
And this events groups by topic partitions so that the handler (or a job 
worker), consuming message from a partition, have all necessary information for 
further processing.
As an example, say we have client’s payment transaction in a kafka topic. We 
grouping by clientId (transaction with the same clientId goes to one same kafka 

RE: kafka partitions, data locality

2019-04-26 Thread Smirnov Sergey Vladimirovich (39833)
Hi,

Dawid, great, thanks!
Any plans to make it stable? 1.9?


Regards,
Sergey

From: Dawid Wysakowicz [mailto:dwysakow...@apache.org]
Sent: Thursday, April 25, 2019 10:54 AM
To: Smirnov Sergey Vladimirovich (39833) ; Ken Krugler 

Cc: user@flink.apache.org; d...@flink.apache.org
Subject: Re: kafka partitions, data locality


Hi Smirnov,

Actually there is a way to tell Flink that data is already partitioned. You can 
try the reinterpretAsKeyedStream[1] method. I must warn you though this is an 
experimental feature.

Best,

Dawid

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/experimental.html#experimental-features
On 19/04/2019 11:48, Smirnov Sergey Vladimirovich (39833) wrote:
Hi Ken,

It’s a bad story for us: even for a small window we have a dozens of thousands 
events per job with 10x in peaks or even more. And the number of jobs was known 
to be high. So instead of N operations (our producer/consumer mechanism) with 
shuffle/resorting (current flink realization) it will be N*ln(N) - the tenfold 
loss of execution speed!
4 all, my next step? Contribute to apache flink? Issues backlog?


With best regards,
Sergey
From: Ken Krugler [mailto:kkrugler_li...@transpac.com]
Sent: Wednesday, April 17, 2019 9:23 PM
To: Smirnov Sergey Vladimirovich (39833) 
<mailto:s.smirn...@tinkoff.ru>
Subject: Re: kafka partitions, data locality

Hi Sergey,

As you surmised, once you do a keyBy/max on the Kafka topic, to group by 
clientId and find the max, then the topology will have a partition/shuffle to 
it.

This is because Flink doesn’t know that client ids don’t span Kafka partitions.

I don’t know of any way to tell Flink that the data doesn’t need to be 
shuffled. There was a 
discussion<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-keying-sub-keying-a-stream-without-repartitioning-td12745.html>
 about adding a keyByWithoutPartitioning a while back, but I don’t think that 
support was ever added.

A simple ProcessFunction with MapState (clientId -> max) should allow you do to 
the same thing without too much custom code. In order to support windowing, 
you’d use triggers to flush state/emit results.

— Ken


On Apr 17, 2019, at 2:33 AM, Smirnov Sergey Vladimirovich (39833) 
mailto:s.smirn...@tinkoff.ru>> wrote:

Hello,

We planning to use apache flink as a core component of our new streaming system 
for internal processes (finance, banking business) based on apache kafka.
So we starting some research with apache flink and one of the question, arises 
during that work, is how flink handle with data locality.
I`ll try to explain: suppose we have a kafka topic with some kind of events. 
And this events groups by topic partitions so that the handler (or a job 
worker), consuming message from a partition, have all necessary information for 
further processing.
As an example, say we have client’s payment transaction in a kafka topic. We 
grouping by clientId (transaction with the same clientId goes to one same kafka 
topic partition) and the task is to find max transaction per client in sliding 
windows. In terms of map\reduce there is no needs to shuffle data between all 
topic consumers, may be it`s worth to do within each consumer to gain some 
speedup due to increasing number of executors within each partition data.
And my question is how flink will work in this case. Do it shuffle all data, or 
it have some settings to avoid this extra unnecessary shuffle/sorting 
operations?
Thanks in advance!


With best regards,
Sergey Smirnov

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



RE: Zeppelin

2019-04-26 Thread Smirnov Sergey Vladimirovich (39833)
Hi,

Dawid, great, thanks for answering.

Jeff,
flink 1.8 with default settings, standalone cluster, one job node and three 
task managers nodes.
zeppelin 0.9 config
checked "Connect to existing cluster"
host: 10.219.179.16
port: 6123
create simple notebook:
%flink
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

zeppelin logs:
2019-04-23 10:09:17,241 WARN  akka.remote.transport.netty.NettyTransport
- Remote connection to [/10.216.26.26:45588] failed with 
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
 Adjusted frame length exceeds 10485760: 2147549189 - discarded
2019-04-23 10:09:29,475 WARN  akka.remote.transport.netty.NettyTransport
- Remote connection to [/10.216.26.26:45624] failed with 
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
 Adjusted frame length exceeds 10485760: 2147549189 - discarded
flink:
org.apache.thrift.transport.TTransportException
at 
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at 
org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at 
org.apache.thrift.protocol.TBinaryProtocol.readStringBody(TBinaryProtocol.java:380)
at 
org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:230)
at 
org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77)
at 
org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.recv_createInterpreter(RemoteInterpreterService.java:189)
at 
org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.createInterpreter(RemoteInterpreterService.java:172)
at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreter$2.call(RemoteInterpreter.java:169)
at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreter$2.call(RemoteInterpreter.java:165)
at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess.callRemoteFunction(RemoteInterpreterProcess.java:118)
at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreter.internal_create(RemoteInterpreter.java:165)
at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreter.open(RemoteInterpreter.java:132)
at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreter.getFormType(RemoteInterpreter.java:290)
at 
org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:443)
at 
org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:75)
at org.apache.zeppelin.scheduler.Job.run(Job.java:181)
at 
org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:123)
at 
org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:187)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
...

Regards,
Sergey

From: Jeff Zhang [mailto:zjf...@gmail.com]
Sent: Thursday, April 25, 2019 4:24 PM
To: Dawid Wysakowicz 
Cc: Smirnov Sergey Vladimirovich (39833) ; 
user@flink.apache.org
Subject: Re: Zeppelin

Thanks Dawid,

Hi Sergey,

I am working on update the flink interpreter of zeppelin to support flink 1.9 
(supposed to be released this summer).
For the current flink interpreter of zeppelin 0.9, I haven't verified it 
against flink 1.8. could you show the full interpreter log ? And what is the 
size your input file ?



Dawid Wysakowicz mailto:dwysakow...@apache.org>> 
于2019年4月25日周四 下午6:31写道:

Hi Sergey,

I am not very familiar with Zepellin. But I know Jeff (cc'ed) is working on 
integrating Flink with some notebooks. He might be able to help you.

Best,

Dawid
On 25/04/2019 08:42, Smirnov Sergey Vladimirovich (39833) wrote:
Hello,

Trying to link Zeppelin 0.9 with Flink 1.8. It`s a small dev cluster deployed 
in standalone manner.
Got the same error as described here 
https://stackoverflow.com/questions/54257671/runnning-a-job-in-apache-flink-standalone-mode-on-zeppelin-i-have-this-error-to
Would appreciate for any support for helping to resolve that problem.

Regards,
Sergey



--
Best Regards

Jeff Zhang


Zeppelin

2019-04-25 Thread Smirnov Sergey Vladimirovich (39833)
Hello,

Trying to link Zeppelin 0.9 with Flink 1.8. It`s a small dev cluster deployed 
in standalone manner.
Got the same error as described here 
https://stackoverflow.com/questions/54257671/runnning-a-job-in-apache-flink-standalone-mode-on-zeppelin-i-have-this-error-to
Would appreciate for any support for helping to resolve that problem.

Regards,
Sergey



RE: kafka partitions, data locality

2019-04-19 Thread Smirnov Sergey Vladimirovich (39833)
Hi Ken,

It’s a bad story for us: even for a small window we have a dozens of thousands 
events per job with 10x in peaks or even more. And the number of jobs was known 
to be high. So instead of N operations (our producer/consumer mechanism) with 
shuffle/resorting (current flink realization) it will be N*ln(N) - the tenfold 
loss of execution speed!
4 all, my next step? Contribute to apache flink? Issues backlog?


With best regards,
Sergey
From: Ken Krugler [mailto:kkrugler_li...@transpac.com]
Sent: Wednesday, April 17, 2019 9:23 PM
To: Smirnov Sergey Vladimirovich (39833) 
Subject: Re: kafka partitions, data locality

Hi Sergey,

As you surmised, once you do a keyBy/max on the Kafka topic, to group by 
clientId and find the max, then the topology will have a partition/shuffle to 
it.

This is because Flink doesn’t know that client ids don’t span Kafka partitions.

I don’t know of any way to tell Flink that the data doesn’t need to be 
shuffled. There was a 
discussion<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-keying-sub-keying-a-stream-without-repartitioning-td12745.html>
 about adding a keyByWithoutPartitioning a while back, but I don’t think that 
support was ever added.

A simple ProcessFunction with MapState (clientId -> max) should allow you do to 
the same thing without too much custom code. In order to support windowing, 
you’d use triggers to flush state/emit results.

— Ken


On Apr 17, 2019, at 2:33 AM, Smirnov Sergey Vladimirovich (39833) 
mailto:s.smirn...@tinkoff.ru>> wrote:

Hello,

We planning to use apache flink as a core component of our new streaming system 
for internal processes (finance, banking business) based on apache kafka.
So we starting some research with apache flink and one of the question, arises 
during that work, is how flink handle with data locality.
I`ll try to explain: suppose we have a kafka topic with some kind of events. 
And this events groups by topic partitions so that the handler (or a job 
worker), consuming message from a partition, have all necessary information for 
further processing.
As an example, say we have client’s payment transaction in a kafka topic. We 
grouping by clientId (transaction with the same clientId goes to one same kafka 
topic partition) and the task is to find max transaction per client in sliding 
windows. In terms of map\reduce there is no needs to shuffle data between all 
topic consumers, may be it`s worth to do within each consumer to gain some 
speedup due to increasing number of executors within each partition data.
And my question is how flink will work in this case. Do it shuffle all data, or 
it have some settings to avoid this extra unnecessary shuffle/sorting 
operations?
Thanks in advance!


With best regards,
Sergey Smirnov

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



kafka partitions, data locality

2019-04-17 Thread Smirnov Sergey Vladimirovich (39833)
Hello,

We planning to use apache flink as a core component of our new streaming system 
for internal processes (finance, banking business) based on apache kafka.
So we starting some research with apache flink and one of the question, arises 
during that work, is how flink handle with data locality.
I`ll try to explain: suppose we have a kafka topic with some kind of events. 
And this events groups by topic partitions so that the handler (or a job 
worker), consuming message from a partition, have all necessary information for 
further processing.
As an example, say we have client's payment transaction in a kafka topic. We 
grouping by clientId (transaction with the same clientId goes to one same kafka 
topic partition) and the task is to find max transaction per client in sliding 
windows. In terms of map\reduce there is no needs to shuffle data between all 
topic consumers, may be it`s worth to do within each consumer to gain some 
speedup due to increasing number of executors within each partition data.
And my question is how flink will work in this case. Do it shuffle all data, or 
it have some settings to avoid this extra unnecessary shuffle/sorting 
operations?
Thanks in advance!


With best regards,
Sergey Smirnov