Re: Cluster failure after zookeeper glitch.

2017-01-19 Thread Andrew Ge Wu
 (n.peerEpoch) FOLLOWING (my state)
2017-01-19 11:52:14,488 [myid:2] - INFO  
[WorkerReceiver[myid=2]:FastLeaderElection@597] - Notification: 1 (message 
format version), 4 (n.leader), 0x400cb (n.zxid), 0x4 (n.round), LOOKING 
(n.state), 1 (n.sid), 0x4 (n.peerEpoch) FOLLOWING (my state)
2017-01-19 11:52:14,489 [myid:2] - INFO  
[WorkerReceiver[myid=2]:FastLeaderElection@597] - Notification: 1 (message 
format version), 4 (n.leader), 0x400cb (n.zxid), 0x4 (n.round), LOOKING 
(n.state), 4 (n.sid), 0x4 (n.peerEpoch) FOLLOWING (my state)
2017-01-19 11:52:14,719 [myid:2] - INFO  
[QuorumPeer[myid=2]/0:0:0:0:0:0:0:0:2181:ZooKeeperServer@617] - Established 
session 0x159b505820a0009 with negotiated timeout 4 for client 
/172.27.163.227:39302

I can’t say for sure if data in zookeeper is corrupted at that time. I guess 
Flink is kinda sensitive on that?


Thanks



Andrew





> On 19 Jan 2017, at 14:19, Stefan Richter  wrote:
> 
> Hi,
> 
> I think depending on your configuration of Flink (are you using high 
> availability mode?) and the type of ZK glitches we are talking about, it can 
> very well be that some of Flink’s meta data in ZK got corrupted and the 
> system can not longer operate. But for a deeper analysis, we would need more 
> details about your configuration and the ZK problem.
> 
> Best,
> Stefan
> 
>> Am 19.01.2017 um 13:16 schrieb Andrew Ge Wu :
>> 
>> Hi,
>> 
>> 
>> We recently had several zookeeper glitch, when that happens it seems to take 
>> flink cluster with it.
>> 
>> We are running on 1.03
>> 
>> It started like this:
>> 
>> 
>> 2017-01-19 11:52:13,047 INFO  org.apache.zookeeper.ClientCnxn
>>- Unable to read additional data from server sessionid 
>> 0x159b505820a0008, likely server has closed socket, closing socket 
>> connection and attempting reconnect
>> 2017-01-19 11:52:13,047 INFO  org.apache.zookeeper.ClientCnxn
>>- Unable to read additional data from server sessionid 
>> 0x159b505820a0009, likely server has closed socket, closing socket 
>> connection and attempting reconnect
>> 2017-01-19 11:52:13,151 INFO  
>> org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionStateManager
>>   - State change: SUSPENDED
>> 2017-01-19 11:52:13,151 INFO  
>> org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionStateManager
>>   - State change: SUSPENDED
>> 2017-01-19 11:52:13,166 WARN  
>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
>> ZooKeeper connection SUSPENDED. Changes to the submitted job graphs are not 
>> monitored (temporarily).
>> 2017-01-19 11:52:13,169 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>>- JobManager akka://flink/user/jobmanager#1976923422 was 
>> revoked leadership.
>> 2017-01-19 11:52:13,179 INFO  
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- op1 -> (Map, 
>> Map -> op2) (18/24) (5336dd375eb12616c5a0e93c84f93465) switched from RUNNING 
>> to FAILED
>> 
>> 
>> 
>> Then our web-ui stopped serving and job manager stuck in an exception loop 
>> like this:
>> 2017-01-19 13:05:13,521 WARN  org.apache.flink.runtime.jobmanager.JobManager 
>>- Discard message 
>> LeaderSessionMessage(0318ecf5-7069-41b2-a793-2f24bdbaa287,01/19/2017 
>> 13:05:13 Job execution switched to status RESTARTING.) because the 
>> expected leader session I
>> D None did not equal the received leader session ID 
>> Some(0318ecf5-7069-41b2-a793-2f24bdbaa287).
>> 2017-01-19 13:05:13,521 INFO  
>> org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy  - 
>> Delaying retry of job execution for x ms …
>> 
>> 
>> Is it because we misconfigured anything? or this is expected behavior? When 
>> this happens we have to restart the cluster to bring it back.
>> 
>> 
>> Thanks!
>> 
>> 
>> Andrew
>> -- 
>> Confidentiality Notice: This e-mail transmission may contain confidential 
>> or legally privileged information that is intended only for the individual 
>> or entity named in the e-mail address. If you are not the intended 
>> recipient, you are hereby notified that any disclosure, copying, 
>> distribution, or reliance upon the contents of this e-mail is strictly 
>> prohibited and may be unlawful. If you have received this e-mail in error, 
>> please notify the sender immediately by return e-mail and delete all copies 
>> of this message.
> 


-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Cluster failure after zookeeper glitch.

2017-01-19 Thread Andrew Ge Wu
Hi,


We recently had several zookeeper glitch, when that happens it seems to take 
flink cluster with it.

We are running on 1.03

It started like this:


2017-01-19 11:52:13,047 INFO  org.apache.zookeeper.ClientCnxn   
- Unable to read additional data from server sessionid 
0x159b505820a0008, likely server has closed socket, closing socket connection 
and attempting reconnect
2017-01-19 11:52:13,047 INFO  org.apache.zookeeper.ClientCnxn   
- Unable to read additional data from server sessionid 
0x159b505820a0009, likely server has closed socket, closing socket connection 
and attempting reconnect
2017-01-19 11:52:13,151 INFO  
org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionStateManager
  - State change: SUSPENDED
2017-01-19 11:52:13,151 INFO  
org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionStateManager
  - State change: SUSPENDED
2017-01-19 11:52:13,166 WARN  
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
ZooKeeper connection SUSPENDED. Changes to the submitted job graphs are not 
monitored (temporarily).
2017-01-19 11:52:13,169 INFO  org.apache.flink.runtime.jobmanager.JobManager
- JobManager akka://flink/user/jobmanager#1976923422 was revoked 
leadership.
2017-01-19 11:52:13,179 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- op1 -> (Map, 
Map -> op2) (18/24) (5336dd375eb12616c5a0e93c84f93465) switched from RUNNING to 
FAILED



Then our web-ui stopped serving and job manager stuck in an exception loop like 
this:
2017-01-19 13:05:13,521 WARN  org.apache.flink.runtime.jobmanager.JobManager
- Discard message 
LeaderSessionMessage(0318ecf5-7069-41b2-a793-2f24bdbaa287,01/19/2017 13:05:13   
  Job execution switched to status RESTARTING.) because the expected leader 
session I
D None did not equal the received leader session ID 
Some(0318ecf5-7069-41b2-a793-2f24bdbaa287).
2017-01-19 13:05:13,521 INFO  
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy  - 
Delaying retry of job execution for x ms …


Is it because we misconfigured anything? or this is expected behavior? When 
this happens we have to restart the cluster to bring it back.


Thanks!


Andrew
-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Re: Can not stop cluster gracefully

2016-11-22 Thread Andrew Ge Wu
Thanks, I’ll give that a try.
> On 22 Nov 2016, at 12:18, Maximilian Michels  wrote:
> 
> The stop script relies on a file in the /tmp directory (location can
> be changed by setting env.pid.dir in the Flink config). If that file
> somehow gets cleanup up occasionally, the stop script can't find the
> process identifiers inside that file to kill the processes.
> 
> Another explanation could be that you're running the stop script from
> a different user. The file contains the user name of the user who
> started the cluster and the stop script looks for the same name again.
> 
> -Max
> 
> 
> On Tue, Nov 22, 2016 at 11:27 AM, Andrew Ge Wu  wrote:
>> Hi all,
>> 
>> 
>> You may hit this problem before, from time to time when i ran the 
>> stop-cluster script, I get this
>> 
>>> No taskmanager daemon to stop on host app25
>>> No taskmanager daemon to stop on host app26
>>> No taskmanager daemon to stop on host app27
>>> No taskmanager daemon to stop on host app83
>>> No taskmanager daemon to stop on host app84
>>> No taskmanager daemon to stop on host app85
>>> No taskmanager daemon to stop on host app86
>>> No taskmanager daemon to stop on host app87
>>> No jobmanager daemon to stop on host app23
>>> No jobmanager daemon to stop on host app24
>> 
>> 
>> But my cluster is actually running…
>> Without tool the only choice I have is to terminate those processes on each 
>> server manually.
>> 
>> Anyone know if this is a setup issue? or a way to solve this?
>> 
>> 
>> Thanks!
>> 
>> 
>> 
>> Andrew
>> 
>> 
>> --
>> Confidentiality Notice: This e-mail transmission may contain confidential
>> or legally privileged information that is intended only for the individual
>> or entity named in the e-mail address. If you are not the intended
>> recipient, you are hereby notified that any disclosure, copying,
>> distribution, or reliance upon the contents of this e-mail is strictly
>> prohibited and may be unlawful. If you have received this e-mail in error,
>> please notify the sender immediately by return e-mail and delete all copies
>> of this message.


-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Can not stop cluster gracefully

2016-11-22 Thread Andrew Ge Wu
Hi all,


You may hit this problem before, from time to time when i ran the stop-cluster 
script, I get this

> No taskmanager daemon to stop on host app25
> No taskmanager daemon to stop on host app26
> No taskmanager daemon to stop on host app27
> No taskmanager daemon to stop on host app83
> No taskmanager daemon to stop on host app84
> No taskmanager daemon to stop on host app85
> No taskmanager daemon to stop on host app86
> No taskmanager daemon to stop on host app87
> No jobmanager daemon to stop on host app23
> No jobmanager daemon to stop on host app24


But my cluster is actually running…
Without tool the only choice I have is to terminate those processes on each 
server manually.

Anyone know if this is a setup issue? or a way to solve this?


Thanks!



Andrew


-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


automatically submit a job to a HA cluster

2016-11-07 Thread Andrew Ge Wu
Hi,

We  have a streaming job wants to submit to a HA cluster via jenkins.
Recently we had a downtime on one of the master node and we have it restarted, 
it seems the backup master became master and submitting to the original master 
does not do anything.

Currently we are using command line to cancel and submit new job on one of the 
master when deploying new jobs.
Anyone know if it is possible to submit to a job to the current acting master 
in the cluster? and is it possible to detach from execution gracefully since it 
is a streaming job?


Thanks very much!


Andrew

Evolution algorithm on flink

2016-10-13 Thread Andrew Ge Wu
Hi guys

I just published my code to maven central, open source ofc.
I try to make this as generic as possible. 
If you are interested, please try it out, and help me to improve this!
https://github.com/CircuitWall/machine-learning 




Thanks!


Andrew

Re: Release notes 1.1.0?

2016-08-10 Thread Andrew Ge Wu

Thanks Stephan for the explanation and everyone involved. You guys are awesome!
I’ll wait for your the next great release.


cheers!

Andrew


> On 10 Aug 2016, at 16:01, Stephan Ewen  wrote:
> 
> Hi!
> 
> In the above example the keySelector would run once before and once inside 
> the window operator. In that sense, the version below is a better way to do 
> it.
> 
> You can also create windows of 50 or max 100 ms by writing your own trigger. 
> Have a look at the count trigger. You can augment it by scheduling a time 
> callback for 100ms to trigger the window.
> https://github.com/apache/flink/blob/master//flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
>  
> <https://github.com/apache/flink/blob/master//flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java>
> 
> 
> The better version of the "random key" program:
> 
> stream
> .map(new MapFunction>() {
> private int key;
> 
> @Override
> public Tuple2map(SocialData data) {
> if (++key >= 24) {
> key = 0;
> }
> return new Tuple2<>(key, data);
> }
> })
> .keyBy(0)
> .timeWindow(Time.milliseconds(100))
> .apply(...)
> 
> 
> Greetings,
> Stephan
> 
> 
> 
> On Wed, Aug 10, 2016 at 3:54 PM, Andrew Ge Wu  <mailto:andrew.ge...@eniro.com>> wrote:
> Hi Stephan
> 
> Thanks for the explanation! We will stick to 1.0.3 to keep our code clean.
> In the workaround case, how does key selector instantiated? One instance per 
> window operator?
> By the way is there a way to create a hybrid window of count and time, like 
> 50 items or max process time 100ms?
> 
> 
> Thanks!
> 
> Andrew
>> On 10 Aug 2016, at 15:33, Stephan Ewen > <mailto:se...@apache.org>> wrote:
>> 
>> Hi Andrew!
>> 
>> Here is the reason for what is happening with your job: 
>> 
>> You have used some sort of undocumented and unofficial corner case behavior 
>> of Flink 1.0.0, namely, using parallel windowAll().
>> Initially, windowAll() was supposed to not be parallel, but the system did 
>> not prevent to set a parallelism.
>> 
>> In Flink 1.0.0 it just happened that a parallel windowAll() behaved like a 
>> "window over stream partition".
>> In Flink 1.1.0, the parallel windowAll() really sends all data to one of the 
>> parallel operators, and the others are idle. Admittedly, Flink 1.1.0 should 
>> simply not allow to set a parallelism on windowAll() - we will fix that.
>> 
>> What we need to figure out now is how to have an adequate replacement for 
>> the "window over stream partition" use case. I think we need to add an 
>> explicit "windowPartition()" function for that case.
>> 
>> Until then, you could stay on Flink 1.0.3 or you can try and use instead of 
>> "windowAll()" a "keyBy().window()" operator and use an incrementing 
>> number%24 as a key (would not be perfectly balanced, but a temporary 
>> workaround):
>> 
>> stream
>> .keyBy(new KeySelector() {
>> private int key;
>> 
>> @Override
>> public Integer getKey(SocialData data) {
>> if (++key >= 24) {
>> key = 0;
>> }
>> return key;
>> }
>> })
>> .timeWindow(Time.milliseconds(100))
>> .apply(...)
>> 
>> 
>> Sorry for the inconvenience!
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> 
>> On Wed, Aug 10, 2016 at 1:15 PM, Andrew Ge Wu > <mailto:andrew.ge...@eniro.com>> wrote:
>> Hi Aljoscha
>> 
>> We are not using state backend explicitly, recovery and state backend are 
>> pointed to file path.
>> See attached json file 
>> 
>> Confidentiality Notice: This e-mail transmission may contain confidential or 
>> legally privileged information that is intended only for the individual or 
>> entity named in the e-mail address. If you are not the intended recipient, 
>> you are hereby notified that any disclosure, copying, distribution, or 
>> reliance upon the contents of this e-mail is strictly prohibited and may be 
>> unlawful. If you have received this e-mail in error, please notify the 
>> sender immediately by return e-mail and delete all copies of this message.
>> 
>> Thanks for the help.
>> 
>> 
>> Best regards
>> 
>> 
>> Andrew
>> 
>>> On 10 Aug 2016

Re: Release notes 1.1.0?

2016-08-10 Thread Andrew Ge Wu
Hi Stephan

Thanks for the explanation! We will stick to 1.0.3 to keep our code clean.
In the workaround case, how does key selector instantiated? One instance per 
window operator?
By the way is there a way to create a hybrid window of count and time, like 50 
items or max process time 100ms?


Thanks!

Andrew
> On 10 Aug 2016, at 15:33, Stephan Ewen  wrote:
> 
> Hi Andrew!
> 
> Here is the reason for what is happening with your job: 
> 
> You have used some sort of undocumented and unofficial corner case behavior 
> of Flink 1.0.0, namely, using parallel windowAll().
> Initially, windowAll() was supposed to not be parallel, but the system did 
> not prevent to set a parallelism.
> 
> In Flink 1.0.0 it just happened that a parallel windowAll() behaved like a 
> "window over stream partition".
> In Flink 1.1.0, the parallel windowAll() really sends all data to one of the 
> parallel operators, and the others are idle. Admittedly, Flink 1.1.0 should 
> simply not allow to set a parallelism on windowAll() - we will fix that.
> 
> What we need to figure out now is how to have an adequate replacement for the 
> "window over stream partition" use case. I think we need to add an explicit 
> "windowPartition()" function for that case.
> 
> Until then, you could stay on Flink 1.0.3 or you can try and use instead of 
> "windowAll()" a "keyBy().window()" operator and use an incrementing number%24 
> as a key (would not be perfectly balanced, but a temporary workaround):
> 
> stream
> .keyBy(new KeySelector() {
> private int key;
> 
> @Override
> public Integer getKey(SocialData data) {
> if (++key >= 24) {
> key = 0;
> }
> return key;
> }
> })
> .timeWindow(Time.milliseconds(100))
> .apply(...)
> 
> 
> Sorry for the inconvenience!
> 
> Greetings,
> Stephan
> 
> 
> 
> On Wed, Aug 10, 2016 at 1:15 PM, Andrew Ge Wu  <mailto:andrew.ge...@eniro.com>> wrote:
> Hi Aljoscha
> 
> We are not using state backend explicitly, recovery and state backend are 
> pointed to file path.
> See attached json file 
> 
> Confidentiality Notice: This e-mail transmission may contain confidential or 
> legally privileged information that is intended only for the individual or 
> entity named in the e-mail address. If you are not the intended recipient, 
> you are hereby notified that any disclosure, copying, distribution, or 
> reliance upon the contents of this e-mail is strictly prohibited and may be 
> unlawful. If you have received this e-mail in error, please notify the sender 
> immediately by return e-mail and delete all copies of this message.
> 
> Thanks for the help.
> 
> 
> Best regards
> 
> 
> Andrew
> 
>> On 10 Aug 2016, at 11:38, Aljoscha Krettek > <mailto:aljos...@apache.org>> wrote:
>> 
>> Oh, are you by any chance specifying a custom state backend for your job? 
>> For example, RocksDBStateBackend.
>> 
>> Cheers,
>> Aljoscha
>> 
>> On Wed, 10 Aug 2016 at 11:17 Aljoscha Krettek > <mailto:aljos...@apache.org>> wrote:
>> Hi,
>> could you maybe send us the output of "env.getExecutionPlan()". This would 
>> help us better understand which operators are used exactly. (You can of 
>> course remove any security sensitive stuff.)
>> 
>> Cheers,
>> Aljoscha
>> 
>> On Tue, 9 Aug 2016 at 15:30 Andrew Ge Wu > <mailto:andrew.ge...@eniro.com>> wrote:
>> Oh sorry missed that part, no, Im not explicitly set that.
>> 
>> 
>>> On 09 Aug 2016, at 15:29, Aljoscha Krettek >> <mailto:aljos...@apache.org>> wrote:
>>> 
>>> Hi,
>>> are you setting a StreamTimeCharacteristic, i.e. 
>>> env.setStreamTimeCharacteristic?
>>> 
>>> Cheers,
>>> Aljoscha
>>> 
>>> On Tue, 9 Aug 2016 at 14:52 Andrew Ge Wu >> <mailto:andrew.ge...@eniro.com>> wrote:
>>> Hi Aljoscha
>>> 
>>> 
>>> Plan attached, there are split streams and union operations around, but 
>>> here is how windows are created
>>> 
>>> Confidentiality Notice: This e-mail transmission may contain confidential 
>>> or legally privileged information that is intended only for the individual 
>>> or entity named in the e-mail address. If you are not the intended 
>>> recipient, you are hereby notified that any disclosure, copying, 
>>> distribution, or reliance upon the contents of this e-mail is strictly 
>>> prohibited and may be unlawful

Re: Release notes 1.1.0?

2016-08-10 Thread Andrew Ge Wu
Hi Aljoscha

We are not using state backend explicitly, recovery and state backend are 
pointed to file path.
See attached json file 

Thanks for the help.


Best regards


Andrew

> On 10 Aug 2016, at 11:38, Aljoscha Krettek  wrote:
> 
> Oh, are you by any chance specifying a custom state backend for your job? For 
> example, RocksDBStateBackend.
> 
> Cheers,
> Aljoscha
> 
> On Wed, 10 Aug 2016 at 11:17 Aljoscha Krettek  <mailto:aljos...@apache.org>> wrote:
> Hi,
> could you maybe send us the output of "env.getExecutionPlan()". This would 
> help us better understand which operators are used exactly. (You can of 
> course remove any security sensitive stuff.)
> 
> Cheers,
> Aljoscha
> 
> On Tue, 9 Aug 2016 at 15:30 Andrew Ge Wu  <mailto:andrew.ge...@eniro.com>> wrote:
> Oh sorry missed that part, no, Im not explicitly set that.
> 
> 
>> On 09 Aug 2016, at 15:29, Aljoscha Krettek > <mailto:aljos...@apache.org>> wrote:
>> 
>> Hi,
>> are you setting a StreamTimeCharacteristic, i.e. 
>> env.setStreamTimeCharacteristic?
>> 
>> Cheers,
>> Aljoscha
>> 
>> On Tue, 9 Aug 2016 at 14:52 Andrew Ge Wu > <mailto:andrew.ge...@eniro.com>> wrote:
>> Hi Aljoscha
>> 
>> 
>> Plan attached, there are split streams and union operations around, but here 
>> is how windows are created
>> 
>> Confidentiality Notice: This e-mail transmission may contain confidential or 
>> legally privileged information that is intended only for the individual or 
>> entity named in the e-mail address. If you are not the intended recipient, 
>> you are hereby notified that any disclosure, copying, distribution, or 
>> reliance upon the contents of this e-mail is strictly prohibited and may be 
>> unlawful. If you have received this e-mail in error, please notify the 
>> sender immediately by return e-mail and delete all copies of this message.
>> 
>> Let me know if I’m doing something out of ordinary here.
>> 
>> 
>> 
>> Thanks!
>> 
>> 
>> Andrew
>>> On 09 Aug 2016, at 14:18, Aljoscha Krettek >> <mailto:aljos...@apache.org>> wrote:
>>> 
>> 
>>> Hi,
>>> could you maybe post how exactly you specify the window? Also, did you set 
>>> a "stream time characteristic", for example EventTime?
>>> 
>>> That could help us pinpoint the problem.
>>> 
>>> Cheers,
>>> Aljoscha
>>> 
>> 
>>> On Tue, 9 Aug 2016 at 12:42 Andrew Ge Wu >> <mailto:andrew.ge...@eniro.com>> wrote:
>> 
>>> I rolled back to 1.0.3
>> 
>>> If I understand this correctly, the peak when topology starts is because it 
>>> is trying to fill all the buffers, but I can not see that in 1.1.0.
>>> 
>>> 
>>> 
>> 
>>>> On 09 Aug 2016, at 12:10, Robert Metzger >>> <mailto:rmetz...@apache.org>> wrote:
>>>> 
>>> 
>>>> Which source are you using?
>>>> 
>>>> On Tue, Aug 9, 2016 at 11:50 AM, Andrew Ge Wu >>> <mailto:andrew.ge...@eniro.com>> wrote:
>>>> Hi Robert
>>>> 
>>>> 
>>>> Thanks for the quick reply, I guess I’m one of the early birds.
>>>> Yes, it is much slower, I’m not sure why, I copied slaves, masters, 
>>>> log4j.properties and flink-conf.yaml directly from 1.0.3
>>>> I have parallelization 1 on my sources, I can increase that to achieve the 
>>>> same speed, but I’m interested to know why is that.
>>>> 
>>>> 
>>>> Thanks!
>>>> 
>>>> 
>>>> Andrew
>>>>> On 09 Aug 2016, at 11:47, Robert Metzger >>>> <mailto:rmetz...@apache.org>> wrote:
>>>>> 
>>>>> Hi Andrew,
>>>>> 
>>>>> here is the release announcement, with a list of all changes: 
>>>>> http://flink.apache.org/news/2016/08/08/release-1.1.0.html 
>>>>> <http://flink.apache.org/news/2016/08/08/release-1.1.0.html>, 
>>>>> http://flink.apache.org/blog/release_1.1.0-changelog.html 
>>>>> <http://flink.apache.org/blog/release_1.1.0-changelog.html>
>>>>> 
>>>>> What does the chart say? Are the results different? is Flink faster or 
>>>>> slower now?
>>>>> 
>>>>> 
>>>>> Regards,
>>>>> Robert
>>>>> 
>>>>> On Tue, Aug 9, 2016 at 11:32 AM, Andrew Ge Wu 

Re: Release notes 1.1.0?

2016-08-09 Thread Andrew Ge Wu
Oh sorry missed that part, no, Im not explicitly set that.


> On 09 Aug 2016, at 15:29, Aljoscha Krettek  wrote:
> 
> Hi,
> are you setting a StreamTimeCharacteristic, i.e. 
> env.setStreamTimeCharacteristic?
> 
> Cheers,
> Aljoscha
> 
> On Tue, 9 Aug 2016 at 14:52 Andrew Ge Wu  <mailto:andrew.ge...@eniro.com>> wrote:
> Hi Aljoscha
> 
> 
> Plan attached, there are split streams and union operations around, but here 
> is how windows are created
> 
> Confidentiality Notice: This e-mail transmission may contain confidential or 
> legally privileged information that is intended only for the individual or 
> entity named in the e-mail address. If you are not the intended recipient, 
> you are hereby notified that any disclosure, copying, distribution, or 
> reliance upon the contents of this e-mail is strictly prohibited and may be 
> unlawful. If you have received this e-mail in error, please notify the sender 
> immediately by return e-mail and delete all copies of this message.
> 
> Let me know if I’m doing something out of ordinary here.
> 
> 
> 
> Thanks!
> 
> 
> Andrew
>> On 09 Aug 2016, at 14:18, Aljoscha Krettek > <mailto:aljos...@apache.org>> wrote:
>> 
> 
>> Hi,
>> could you maybe post how exactly you specify the window? Also, did you set a 
>> "stream time characteristic", for example EventTime?
>> 
>> That could help us pinpoint the problem.
>> 
>> Cheers,
>> Aljoscha
>> 
> 
>> On Tue, 9 Aug 2016 at 12:42 Andrew Ge Wu > <mailto:andrew.ge...@eniro.com>> wrote:
> 
>> I rolled back to 1.0.3
> 
>> If I understand this correctly, the peak when topology starts is because it 
>> is trying to fill all the buffers, but I can not see that in 1.1.0.
>> 
>> 
>> 
> 
>>> On 09 Aug 2016, at 12:10, Robert Metzger >> <mailto:rmetz...@apache.org>> wrote:
>>> 
>> 
>>> Which source are you using?
>>> 
>>> On Tue, Aug 9, 2016 at 11:50 AM, Andrew Ge Wu >> <mailto:andrew.ge...@eniro.com>> wrote:
>>> Hi Robert
>>> 
>>> 
>>> Thanks for the quick reply, I guess I’m one of the early birds.
>>> Yes, it is much slower, I’m not sure why, I copied slaves, masters, 
>>> log4j.properties and flink-conf.yaml directly from 1.0.3
>>> I have parallelization 1 on my sources, I can increase that to achieve the 
>>> same speed, but I’m interested to know why is that.
>>> 
>>> 
>>> Thanks!
>>> 
>>> 
>>> Andrew
>>>> On 09 Aug 2016, at 11:47, Robert Metzger >>> <mailto:rmetz...@apache.org>> wrote:
>>>> 
>>>> Hi Andrew,
>>>> 
>>>> here is the release announcement, with a list of all changes: 
>>>> http://flink.apache.org/news/2016/08/08/release-1.1.0.html 
>>>> <http://flink.apache.org/news/2016/08/08/release-1.1.0.html>, 
>>>> http://flink.apache.org/blog/release_1.1.0-changelog.html 
>>>> <http://flink.apache.org/blog/release_1.1.0-changelog.html>
>>>> 
>>>> What does the chart say? Are the results different? is Flink faster or 
>>>> slower now?
>>>> 
>>>> 
>>>> Regards,
>>>> Robert
>>>> 
>>>> On Tue, Aug 9, 2016 at 11:32 AM, Andrew Ge Wu >>> <mailto:andrew.ge...@eniro.com>> wrote:
>>>> Hi,
>>>> 
>>>> We found out there is a new stable version released: 1.1.0 but we can not 
>>>> find any release note.
>>>> Do anyone know where to find it?
>>>> 
>>>> 
>>>> We are experience some change of behavior, I’m not sure if it is related.
>>>> 
>>>> 
>>>> 
>>>> Thanks
>>>> 
>>>> 
>>>> Andrew
>>>> 
>>>> Confidentiality Notice: This e-mail transmission may contain confidential 
>>>> or legally privileged information that is intended only for the individual 
>>>> or entity named in the e-mail address. If you are not the intended 
>>>> recipient, you are hereby notified that any disclosure, copying, 
>>>> distribution, or reliance upon the contents of this e-mail is strictly 
>>>> prohibited and may be unlawful. If you have received this e-mail in error, 
>>>> please notify the sender immediately by return e-mail and delete all 
>>>> copies of this message.
>>>> 
>>> 
>>> 
>>> Confidentiality Notice: This e-ma

Re: Release notes 1.1.0?

2016-08-09 Thread Andrew Ge Wu
I rolled back to 1.0.3

If I understand this correctly, the peak when topology starts is because it is 
trying to fill all the buffers, but I can not see that in 1.1.0.



> On 09 Aug 2016, at 12:10, Robert Metzger  wrote:
> 
> Which source are you using?
> 
> On Tue, Aug 9, 2016 at 11:50 AM, Andrew Ge Wu  <mailto:andrew.ge...@eniro.com>> wrote:
> Hi Robert
> 
> 
> Thanks for the quick reply, I guess I’m one of the early birds.
> Yes, it is much slower, I’m not sure why, I copied slaves, masters, 
> log4j.properties and flink-conf.yaml directly from 1.0.3
> I have parallelization 1 on my sources, I can increase that to achieve the 
> same speed, but I’m interested to know why is that.
> 
> 
> Thanks!
> 
> 
> Andrew
>> On 09 Aug 2016, at 11:47, Robert Metzger > <mailto:rmetz...@apache.org>> wrote:
>> 
>> Hi Andrew,
>> 
>> here is the release announcement, with a list of all changes: 
>> http://flink.apache.org/news/2016/08/08/release-1.1.0.html 
>> <http://flink.apache.org/news/2016/08/08/release-1.1.0.html>, 
>> http://flink.apache.org/blog/release_1.1.0-changelog.html 
>> <http://flink.apache.org/blog/release_1.1.0-changelog.html>
>> 
>> What does the chart say? Are the results different? is Flink faster or 
>> slower now?
>> 
>> 
>> Regards,
>> Robert
>> 
>> On Tue, Aug 9, 2016 at 11:32 AM, Andrew Ge Wu > <mailto:andrew.ge...@eniro.com>> wrote:
>> Hi,
>> 
>> We found out there is a new stable version released: 1.1.0 but we can not 
>> find any release note.
>> Do anyone know where to find it?
>> 
>> 
>> We are experience some change of behavior, I’m not sure if it is related.
>> 
>> 
>> 
>> Thanks
>> 
>> 
>> Andrew
>> 
>> Confidentiality Notice: This e-mail transmission may contain confidential or 
>> legally privileged information that is intended only for the individual or 
>> entity named in the e-mail address. If you are not the intended recipient, 
>> you are hereby notified that any disclosure, copying, distribution, or 
>> reliance upon the contents of this e-mail is strictly prohibited and may be 
>> unlawful. If you have received this e-mail in error, please notify the 
>> sender immediately by return e-mail and delete all copies of this message.
>> 
> 
> 
> Confidentiality Notice: This e-mail transmission may contain confidential or 
> legally privileged information that is intended only for the individual or 
> entity named in the e-mail address. If you are not the intended recipient, 
> you are hereby notified that any disclosure, copying, distribution, or 
> reliance upon the contents of this e-mail is strictly prohibited and may be 
> unlawful. If you have received this e-mail in error, please notify the sender 
> immediately by return e-mail and delete all copies of this message.
> 


-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Re: Release notes 1.1.0?

2016-08-09 Thread Andrew Ge Wu
We wrote our own source. I noticed our back pressure changed from ok to high 
after relance().timeWindowAll(), if there is no obvious change on that, the 
problem can be in our function after this.


> On 09 Aug 2016, at 12:10, Robert Metzger  wrote:
> 
> Which source are you using?
> 
> On Tue, Aug 9, 2016 at 11:50 AM, Andrew Ge Wu  <mailto:andrew.ge...@eniro.com>> wrote:
> Hi Robert
> 
> 
> Thanks for the quick reply, I guess I’m one of the early birds.
> Yes, it is much slower, I’m not sure why, I copied slaves, masters, 
> log4j.properties and flink-conf.yaml directly from 1.0.3
> I have parallelization 1 on my sources, I can increase that to achieve the 
> same speed, but I’m interested to know why is that.
> 
> 
> Thanks!
> 
> 
> Andrew
>> On 09 Aug 2016, at 11:47, Robert Metzger > <mailto:rmetz...@apache.org>> wrote:
>> 
>> Hi Andrew,
>> 
>> here is the release announcement, with a list of all changes: 
>> http://flink.apache.org/news/2016/08/08/release-1.1.0.html 
>> <http://flink.apache.org/news/2016/08/08/release-1.1.0.html>, 
>> http://flink.apache.org/blog/release_1.1.0-changelog.html 
>> <http://flink.apache.org/blog/release_1.1.0-changelog.html>
>> 
>> What does the chart say? Are the results different? is Flink faster or 
>> slower now?
>> 
>> 
>> Regards,
>> Robert
>> 
>> On Tue, Aug 9, 2016 at 11:32 AM, Andrew Ge Wu > <mailto:andrew.ge...@eniro.com>> wrote:
>> Hi,
>> 
>> We found out there is a new stable version released: 1.1.0 but we can not 
>> find any release note.
>> Do anyone know where to find it?
>> 
>> 
>> We are experience some change of behavior, I’m not sure if it is related.
>> 
>> 
>> 
>> Thanks
>> 
>> 
>> Andrew
>> 
>> Confidentiality Notice: This e-mail transmission may contain confidential or 
>> legally privileged information that is intended only for the individual or 
>> entity named in the e-mail address. If you are not the intended recipient, 
>> you are hereby notified that any disclosure, copying, distribution, or 
>> reliance upon the contents of this e-mail is strictly prohibited and may be 
>> unlawful. If you have received this e-mail in error, please notify the 
>> sender immediately by return e-mail and delete all copies of this message.
>> 
> 
> 
> Confidentiality Notice: This e-mail transmission may contain confidential or 
> legally privileged information that is intended only for the individual or 
> entity named in the e-mail address. If you are not the intended recipient, 
> you are hereby notified that any disclosure, copying, distribution, or 
> reliance upon the contents of this e-mail is strictly prohibited and may be 
> unlawful. If you have received this e-mail in error, please notify the sender 
> immediately by return e-mail and delete all copies of this message.
> 


-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Re: Release notes 1.1.0?

2016-08-09 Thread Andrew Ge Wu
Hi Robert


Thanks for the quick reply, I guess I’m one of the early birds.
Yes, it is much slower, I’m not sure why, I copied slaves, masters, 
log4j.properties and flink-conf.yaml directly from 1.0.3
I have parallelization 1 on my sources, I can increase that to achieve the same 
speed, but I’m interested to know why is that.


Thanks!


Andrew
> On 09 Aug 2016, at 11:47, Robert Metzger  wrote:
> 
> Hi Andrew,
> 
> here is the release announcement, with a list of all changes: 
> http://flink.apache.org/news/2016/08/08/release-1.1.0.html 
> <http://flink.apache.org/news/2016/08/08/release-1.1.0.html>, 
> http://flink.apache.org/blog/release_1.1.0-changelog.html 
> <http://flink.apache.org/blog/release_1.1.0-changelog.html>
> 
> What does the chart say? Are the results different? is Flink faster or slower 
> now?
> 
> 
> Regards,
> Robert
> 
> On Tue, Aug 9, 2016 at 11:32 AM, Andrew Ge Wu  <mailto:andrew.ge...@eniro.com>> wrote:
> Hi,
> 
> We found out there is a new stable version released: 1.1.0 but we can not 
> find any release note.
> Do anyone know where to find it?
> 
> 
> We are experience some change of behavior, I’m not sure if it is related.
> 
> 
> 
> Thanks
> 
> 
> Andrew
> 
> Confidentiality Notice: This e-mail transmission may contain confidential or 
> legally privileged information that is intended only for the individual or 
> entity named in the e-mail address. If you are not the intended recipient, 
> you are hereby notified that any disclosure, copying, distribution, or 
> reliance upon the contents of this e-mail is strictly prohibited and may be 
> unlawful. If you have received this e-mail in error, please notify the sender 
> immediately by return e-mail and delete all copies of this message.
> 


-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Release notes 1.1.0?

2016-08-09 Thread Andrew Ge Wu
Hi,

We found out there is a new stable version released: 1.1.0 but we can not find 
any release note.
Do anyone know where to find it?


We are experience some change of behavior, I’m not sure if it is related.



Thanks


Andrew
-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Re: Parallel execution on AllWindows

2016-08-04 Thread Andrew Ge Wu
Thanks for the quick response, everything is clear!

cheers!

Andrew
> On 03 Aug 2016, at 18:11, Aljoscha Krettek  wrote:
> 
> Hi,
> "rebalance" simply specifies the strategy to use when sending elements 
> downstream to the next operator(s). There is no interaction or competition 
> between the parallel window operator instances. Each will do windowing 
> locally based on the elements that it receives from upstream.
> 
> Cheers,
> Aljoscha
> 
> On Wed, 3 Aug 2016 at 08:26  <mailto:andrew.ge...@eniro.com>> wrote:
> Hi Aljoscha
> 
> Thanks for the explanation.
> One other thing, when you say there is no coordination is that means 
> rebalance() will not be honored, and each window operator instance will 
> compete for the next available window?
> 
> Thanks
> 
> Andrew
> From mobile
> 
> From: Aljoscha Krettek
> Sent: Wednesday, August 3, 17:11
> Subject: Re: Parallel execution on AllWindows
> To: user@flink.apache.org <mailto:user@flink.apache.org>
> Hi,
> 
> if you manually force a parallelism different from 1 after a *windowAll() 
> then you will get parallel execution of your window. For example, if you do 
> this:
> 
> input.countWindowAll(100).setParallelism(5)
> 
> then you will get five parallel instances of the window operator that each 
> wait for 100 elements before they fire the window. There is no global 
> coordination between the parallel instances that would allow it to fire once 
> 100 elements are received across the parallel instances.
> 
> Cheers,
> 
> Aljoscha
> 
> On Wed, 3 Aug 2016 at 05:10 Andrew Ge Wu  <mailto:andrew.ge...@eniro.com>> wrote:
> 
>> Hi,
>> 
>> I have such task that I want to count window on a stream and execute them 
>> batch by batch.
>> 
>> Execute a count window may take some time, so I want it to be executed in 
>> parallel.
>> 
>> I read this part in the documentation when I found it automatically reduced 
>> parallelization to 1
>> 
>> * Note: This operation can be inherently non-parallel since all elements 
>> have to pass through
>> * the same operator instance. (Only for special cases, such as aligned time 
>> windows is
>> * it possible to perform this operation in parallel).
>> 
>> (It looks like the java doc is copied from timeWindowAll)
>> 
>> If I force all window function to run in parallel, what will happen?
>> 
>> Will a time/count window broadcast to all instances of the function? or will 
>> it be send to one of the instance so I can parallelize my work?
>> 
>> 
>> Thanks!
>> 
>> 
>> 
>> Andrew
>> 
>> Confidentiality Notice: This e-mail transmission may contain confidential or 
>> legally privileged information that is intended only for the individual or 
>> entity named in the e-mail address. If you are not the intended recipient, 
>> you are hereby notified that any disclosure, copying, distribution, or 
>> reliance upon the contents of this e-mail is strictly prohibited and may be 
>> unlawful. If you have received this e-mail in error, please notify the 
>> sender immediately by return e-mail and delete all copies of this message.
> 
> 
> Confidentiality Notice: This e-mail transmission may contain confidential or 
> legally privileged information that is intended only for the individual or 
> entity named in the e-mail address. If you are not the intended recipient, 
> you are hereby notified that any disclosure, copying, distribution, or 
> reliance upon the contents of this e-mail is strictly prohibited and may be 
> unlawful. If you have received this e-mail in error, please notify the sender 
> immediately by return e-mail and delete all copies of this message.


-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Re: Parallel execution on AllWindows

2016-08-03 Thread andrew . ge-wu


Hi Aljoscha


Thanks for the explanation.

One other thing, when you say there is no coordination is that means 
rebalance() will not be honored, and each window operator instance will compete 
for the next available window?


Thanks



Andrew

>From mobile



From: Aljoscha Krettek

Sent: Wednesday, August 3, 17:11

Subject: Re: Parallel execution on AllWindows

To: user@flink.apache.org



Hi,


if you manually force a parallelism different from 1 after a *windowAll() then 
you will get parallel execution of your window. For example, if you do this:



input.countWindowAll(100).setParallelism(5)



then you will get five parallel instances of the window operator that each wait 
for 100 elements before they fire the window. There is no global coordination 
between the parallel instances that would allow it to fire once 100 elements 
are received across the parallel instances.



Cheers,


Aljoscha



On Wed, 3 Aug 2016 at 05:10 Andrew Ge Wu  wrote:


Hi,



I have such task that I want to count window on a stream and execute them batch 
by batch.


Execute a count window may take some time, so I want it to be executed in 
parallel.


I read this part in the documentation when I found it automatically reduced 
parallelization to 1



* Note: This operation can be inherently non-parallel since all elements have 
to pass through

* the same operator instance. (Only for special cases, such as aligned time 
windows is

* it possible to perform this operation in parallel).


(It looks like the java doc is copied from timeWindowAll)



If I force all window function to run in parallel, what will happen?


Will a time/count window broadcast to all instances of the function? or will it 
be send to one of the instance so I can parallelize my work?




Thanks!





Andrew



Confidentiality Notice: This e-mail transmission may contain confidential or 
legally privileged information that is intended only for the individual or 
entity named in the e-mail address. If you are not the intended recipient, you 
are hereby notified that any disclosure, copying, distribution, or reliance 
upon the contents of this e-mail is strictly prohibited and may be unlawful. If 
you have received this e-mail in error, please notify the sender immediately by 
return e-mail and delete all copies of this message.








-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Parallel execution on AllWindows

2016-08-03 Thread Andrew Ge Wu
Hi,

I have such task that I want to count window on a stream and execute them batch 
by batch.
Execute a count window may take some time, so I want it to be executed in 
parallel.
I read this part in the documentation when I found it automatically reduced 
parallelization to 1

* Note: This operation can be inherently non-parallel since all elements have 
to pass through
* the same operator instance. (Only for special cases, such as aligned time 
windows is
* it possible to perform this operation in parallel).
(It looks like the java doc is copied from timeWindowAll)

If I force all window function to run in parallel, what will happen?
Will a time/count window broadcast to all instances of the function? or will it 
be send to one of the instance so I can parallelize my work?


Thanks!



Andrew
-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Re: Limit buffer size for a job

2016-04-13 Thread Andrew Ge Wu
Thanks guys for the explanation, I will give it a try.
After all buffers are filled, the back pressure did it’s job, it works so far 
so good, but I will defiantly give a try to control the latency.


Thanks again!


Andrew



> On 11 Apr 2016, at 18:19, Stephan Ewen  wrote:
> 
> Hi!
> 
> Ufuk's suggestion explains how to buffer less between Flink operators.
> 
> Is that what you were looking for, or are you looking for a way to fetch more 
> fine grained in the source from the message queue?
> What type of source are you using?
> 
> Greetings,
> Stephan
> 
> 
> 
> 
> On Mon, Apr 11, 2016 at 5:02 PM, Ufuk Celebi  <mailto:u...@apache.org>> wrote:
> Hey Andrew,
> 
> take a look at this here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/index.html#controlling-latency
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/index.html#controlling-latency>
> 
> Does this help?
> 
> – Ufuk
> 
> On Thu, Apr 7, 2016 at 3:04 PM, Andrew Ge Wu  <mailto:andrew.ge...@eniro.com>> wrote:
> > Hi guys
> >
> > We have a prioritized queue, where high priority item can jump the queue 
> > and we do not want to cache too much record in the buffer.
> > Is there a way to configure my streaming source to use less buffer? so 
> > source always fetch and get latest high prio records?
> >
> > Any suggestion? thanks!
> >
> >
> > Andrew
> > --
> > Confidentiality Notice: This e-mail transmission may contain confidential
> > or legally privileged information that is intended only for the individual
> > or entity named in the e-mail address. If you are not the intended
> > recipient, you are hereby notified that any disclosure, copying,
> > distribution, or reliance upon the contents of this e-mail is strictly
> > prohibited and may be unlawful. If you have received this e-mail in error,
> > please notify the sender immediately by return e-mail and delete all copies
> > of this message.
> 


-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Limit buffer size for a job

2016-04-07 Thread Andrew Ge Wu
Hi guys

We have a prioritized queue, where high priority item can jump the queue and we 
do not want to cache too much record in the buffer.
Is there a way to configure my streaming source to use less buffer? so source 
always fetch and get latest high prio records?

Any suggestion? thanks!


Andrew
-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Re: Java 8 and keyBy in 1.0.0

2016-03-30 Thread Andrew Ge Wu
Thanks for the quick reply

looks like Chesnay Schepler already found the ticket

https://issues.apache.org/jira/browse/FLINK-3138 
<https://issues.apache.org/jira/browse/FLINK-3138>
with status:



Of course if it would be great if this can be fixed soon, but this can be a bit 
more obvious somewhere.

Thanks!


Andrew
> On 30 Mar 2016, at 12:39, Timo Walther  wrote:
> 
> I will assign this issue to me and fix it soon, if that's ok?
> 
> Regards,
> Timo
> 
> 
> On 30.03.2016 11:30, Stephan Ewen wrote:
>> Looks like something we should fix though. Probably just needs a case 
>> distinction in the TypeExtractor.
>> 
>> @Andrew, can you post the stack trace into the me linked issue?
>> 
>> We'll try to get at it until the next release...
>> 
>> On Wed, Mar 30, 2016 at 10:58 AM, Chesnay Schepler < 
>> <mailto:ches...@apache.org>ches...@apache.org <mailto:ches...@apache.org>> 
>> wrote:
>> based on https://issues.apache.org/jira/browse/FLINK-3138 
>> <https://issues.apache.org/jira/browse/FLINK-3138> this is not supported for 
>> non-static methods.
>> 
>> 
>> On 30.03.2016 10:33, Andrew Ge Wu wrote:
>>> Hi,
>>> 
>>> This is not very obvious and looks like a bug.
>>> 
>>> I have a lambda expression to get key from objects in stream:
>>> 
>>> This works:
>>> stream.keyBy(value -> value.getId())
>>> 
>>> This does not:
>>> stream.keyBy(myClass::getId)
>>> 
>>> Exception:
>>> 
>>> Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1
>>> at 
>>> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:294)
>>> at 
>>> org.apache.flink.api.java.typeutils.TypeExtractor.getKeySelectorTypes(TypeExtractor.java:253)
>>> at 
>>> org.apache.flink.api.java.typeutils.TypeExtractor.getKeySelectorTypes(TypeExtractor.java:246)
>>> at 
>>> org.apache.flink.streaming.api.datastream.KeyedStream.(KeyedStream.java:87)
>>> at 
>>> org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:241)
>>> 
>>> 
>>> Can someone confirm this? or is there a better way to do it?
>>> 
>>> 
>>> Thanks!
>>> 
>>> 
>>> Andrew
>>> 
>>> Confidentiality Notice: This e-mail transmission may contain confidential 
>>> or legally privileged information that is intended only for the individual 
>>> or entity named in the e-mail address. If you are not the intended 
>>> recipient, you are hereby notified that any disclosure, copying, 
>>> distribution, or reliance upon the contents of this e-mail is strictly 
>>> prohibited and may be unlawful. If you have received this e-mail in error, 
>>> please notify the sender immediately by return e-mail and delete all copies 
>>> of this message.
>> 
>> 
> 


-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Java 8 and keyBy in 1.0.0

2016-03-30 Thread Andrew Ge Wu
Hi,

This is not very obvious and looks like a bug.

I have a lambda expression to get key from objects in stream:

This works:
stream.keyBy(value -> value.getId())

This does not:
stream.keyBy(myClass::getId)

Exception:

Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1
at 
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:294)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.getKeySelectorTypes(TypeExtractor.java:253)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.getKeySelectorTypes(TypeExtractor.java:246)
at 
org.apache.flink.streaming.api.datastream.KeyedStream.(KeyedStream.java:87)
at 
org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:241)


Can someone confirm this? or is there a better way to do it?


Thanks!


Andrew
-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Re: state.backend.fs.checkpointdir setting

2016-02-22 Thread Andrew Ge Wu
Hi Robert

I just checked my settings in Task Managers (they were configured separately), 
they are misconfigured.
My job now runs correctly, after reconfigured them.

Thanks!

Andrew
> On 22 Feb 2016, at 09:41, Robert Metzger  wrote:
> 
> Hi,
> 
> how is your cluster setup? Do you have multiple machines, or only one?
> Did you copy the configuration to all machines?
> 
> 
> 
> On Fri, Feb 19, 2016 at 6:08 PM, Andrew Ge Wu  <mailto:andrew.ge...@eniro.com>> wrote:
> Hi All,
> 
> I have been experiencing an error stopping my HA standalone setup.
> 
> The cluster startup just fine, but when i deploy an application to it, I got 
> this exception:
> 
> 
> java.lang.Exception: Call to registerInputOutput() of invokable failed
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.configuration.IllegalConfigurationException: 
> Cannot create the file system state backend: The configuration does not 
> specify the checkpoint directory 'state.backend.fs.checkpointdir'
> at 
> org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:41)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:517)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutput(StreamTask.java:171)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526)
> 
> Here’s my configuration:
> 
> ….
> #
> # Note: You need to set the state backend to 'filesystem' and the checkpoint
> # directory (see above) before configuring the storageDir.
> #
> # recovery.zookeeper.storageDir: hdfs:///recovery
> recovery.zookeeper.path.root: /flink
> state.backend: filesystem
> 
> state.backend.fs.checkpointdir: file:///apps/flink/checkpoints/
> recovery.zookeeper.storageDir: file:///apps/flink/recovery/
> 
> 
> 
> And here’s my flink folder.
> 
> drwxr-xr-x 11 {user} {group}   4.0K Feb 19 17:31 .
> drwxrwxr-x  6 {user} {group}   4.0K Feb 19 11:25 ..
> -rw-r--r--  1 {user} {group}17K Nov 22 13:52 LICENSE
> -rw-r--r--  1 {user} {group}779 Nov 22 13:52 NOTICE
> -rw-r--r--  1 {user} {group}   1.3K Nov 22 13:52 README.txt
> drwxr-xr-x  2 {user} {group}   4.0K Nov 22 13:52 bin
> drwxr-xr-x  2 {user} {group}   4.0K Feb 19 17:31 checkpoints
> drwxr-xr-x  2 {user} {group}   4.0K Feb 19 17:37 conf
> drwxr-xr-x  2 {user} {group}   4.0K Nov 22 13:52 examples
> drwxr-xr-x  2 {user} {group}   4.0K Nov 22 13:52 lib
> drwxr-xr-x  2 {user} {group}   4.0K Feb 19 17:48 log
> drwxr-xr-x  3 {user} {group}   4.0K Feb 19 17:48 recovery
> drwxr-xr-x  3 {user} {group}   4.0K Nov 22 13:52 resources
> drwxr-xr-x  5 {user} {group}   4.0K Nov 22 13:52 tools
> 
> The recovery folder works just fine, with blobs in it.
> 
> 
> 
> Thanks!
> 
> 
> 
> Andrew
> --
> Confidentiality Notice: This e-mail transmission may contain confidential
> or legally privileged information that is intended only for the individual
> or entity named in the e-mail address. If you are not the intended
> recipient, you are hereby notified that any disclosure, copying,
> distribution, or reliance upon the contents of this e-mail is strictly
> prohibited and may be unlawful. If you have received this e-mail in error,
> please notify the sender immediately by return e-mail and delete all copies
> of this message.
> 


-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


state.backend.fs.checkpointdir setting

2016-02-19 Thread Andrew Ge Wu
Hi All,

I have been experiencing an error stopping my HA standalone setup.

The cluster startup just fine, but when i deploy an application to it, I got 
this exception:


java.lang.Exception: Call to registerInputOutput() of invokable failed
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot 
create the file system state backend: The configuration does not specify the 
checkpoint directory 'state.backend.fs.checkpointdir'
at 
org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:41)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:517)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutput(StreamTask.java:171)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526)

Here’s my configuration:

….
#
# Note: You need to set the state backend to 'filesystem' and the checkpoint
# directory (see above) before configuring the storageDir.
#
# recovery.zookeeper.storageDir: hdfs:///recovery
recovery.zookeeper.path.root: /flink
state.backend: filesystem

state.backend.fs.checkpointdir: file:///apps/flink/checkpoints/
recovery.zookeeper.storageDir: file:///apps/flink/recovery/



And here’s my flink folder.  

drwxr-xr-x 11 {user} {group}   4.0K Feb 19 17:31 .
drwxrwxr-x  6 {user} {group}   4.0K Feb 19 11:25 ..
-rw-r--r--  1 {user} {group}17K Nov 22 13:52 LICENSE
-rw-r--r--  1 {user} {group}779 Nov 22 13:52 NOTICE
-rw-r--r--  1 {user} {group}   1.3K Nov 22 13:52 README.txt
drwxr-xr-x  2 {user} {group}   4.0K Nov 22 13:52 bin
drwxr-xr-x  2 {user} {group}   4.0K Feb 19 17:31 checkpoints
drwxr-xr-x  2 {user} {group}   4.0K Feb 19 17:37 conf
drwxr-xr-x  2 {user} {group}   4.0K Nov 22 13:52 examples
drwxr-xr-x  2 {user} {group}   4.0K Nov 22 13:52 lib
drwxr-xr-x  2 {user} {group}   4.0K Feb 19 17:48 log
drwxr-xr-x  3 {user} {group}   4.0K Feb 19 17:48 recovery
drwxr-xr-x  3 {user} {group}   4.0K Nov 22 13:52 resources
drwxr-xr-x  5 {user} {group}   4.0K Nov 22 13:52 tools

The recovery folder works just fine, with blobs in it. 



Thanks!



Andrew
-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Re: Very old dependencies and solutions

2016-02-18 Thread Andrew Ge Wu
Thanks Stephan, problem solved.
here is my configuration

org.apache.maven.plugins
maven-shade-plugin
2.4.3


package

shade






my.main.class




org.apache.http
org.apache.shaded.http


org.apache.commons
org.apache.shaded.commons




/Andrew

> On 18 Feb 2016, at 14:02, Stephan Ewen  wrote:
> 
> Hi!
> 
> A lot of those dependencies are pulled in by Hadoop (for example the 
> configuration / HTTP components).
> 
> In 1.0-SNAPSHOT, the HTTP components dependency has been shaded away in 
> Hadoop, so it should not bother you any more.
> 
> One solution you can always do is to "shade" your dependencies in your fat 
> jar. Shading relocates the classes (and rewrites your code) so the 
> dependencies do not conflict, but co-exist in different namespaces.
> 
> Greetings,
> Stephan
> 
> 
> On Thu, Feb 18, 2016 at 1:56 PM, Andrew Ge Wu  <mailto:andrew.ge...@eniro.com>> wrote:
> Hi guys,
> 
> You probably have noticed. I found a lot of old dependencies (http component 
> 3.1/apache configuration 1.6 etc..) in Flink and leads up errors to stuff 
> like this:
> java.lang.NoClassDefFoundError: Could not initialize class 
> org.apache.http.conn.ssl.SSLConnectionSocketFactory
> 
> Is there anyway to get around this?
> 
> Thanks!
> 
> 
> Andrew
> --
> Confidentiality Notice: This e-mail transmission may contain confidential
> or legally privileged information that is intended only for the individual
> or entity named in the e-mail address. If you are not the intended
> recipient, you are hereby notified that any disclosure, copying,
> distribution, or reliance upon the contents of this e-mail is strictly
> prohibited and may be unlawful. If you have received this e-mail in error,
> please notify the sender immediately by return e-mail and delete all copies
> of this message.
> 


-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Very old dependencies and solutions

2016-02-18 Thread Andrew Ge Wu
Hi guys, 

You probably have noticed. I found a lot of old dependencies (http component 
3.1/apache configuration 1.6 etc..) in Flink and leads up errors to stuff like 
this:
java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.http.conn.ssl.SSLConnectionSocketFactory

Is there anyway to get around this? 

Thanks!


Andrew
-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Re: Failed to submit 0.10.1

2016-02-08 Thread Andrew Ge Wu
Yes, found a special dependency for 2.11, Thanks!



org.apache.flink
flink-streaming-java_2.11
${apache.flink.versin}


Andrew

> On 08 Feb 2016, at 14:18, Andrew Ge Wu  wrote:
> 
> Thanks Max
> 
> My local and remote environment are running: Scala code runner version 2.11.7 
> -- Copyright 2002-2013, LAMP/EPFL
> And I downloaded binary 
> 2.11(//apache.mirrors.spacedump.net/flink/flink-0.10.1/flink-0.10.1-bin-hadoop27-scala_2.11.tgz),
> Is there a different version of client lib for scala 2.11?
> 
> 
> Best, 
> 
> Andrew
> 
>> On 08 Feb 2016, at 11:30, Maximilian Michels  wrote:
>> 
>> Hi Andrew,
>> 
>> It appears that you're using two different versions of the Scala
>> library in your Flink job. Please make sure you use either 2.10 or
>> 2.11 but not both at the same time.
>> 
>> Best,
>> Max
>> 
>> On Mon, Feb 8, 2016 at 10:30 AM, Andrew Ge Wu  wrote:
>>> Hi All
>>> 
>>> I’m new to flink and come to the step to submit to a remote cluster, and it 
>>> failed with following message:
>>> 
>>> Association with remote system [akka.tcp://flink@127.0.0.1:61231] has 
>>> failed, address is now gated for [5000] ms. Reason is: [scala.Option; local 
>>> class incompatible: stream classdesc serialVersionUID = 
>>> -2062608324514658839, local class serialVersionUID = -114498752079829388].
>>> 
>>> I have doubled checked that my client and server version are the 
>>> same(0.10.1), but my java version is a bit different
>>> Java(TM) SE Runtime Environment (build 1.8.0_25-b17)
>>> vs.
>>> Java(TM) SE Runtime Environment (build 1.8.0_60-b27)
>>> 
>>> Is java the issue or is there anything else i may be missing?
>>> 
>>> 
>>> Many thanks
>>> 
>>> 
>>> Andrew
>>> 
>>> 
>>> --
>>> Confidentiality Notice: This e-mail transmission may contain confidential
>>> or legally privileged information that is intended only for the individual
>>> or entity named in the e-mail address. If you are not the intended
>>> recipient, you are hereby notified that any disclosure, copying,
>>> distribution, or reliance upon the contents of this e-mail is strictly
>>> prohibited and may be unlawful. If you have received this e-mail in error,
>>> please notify the sender immediately by return e-mail and delete all copies
>>> of this message.
> 


-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Re: Failed to submit 0.10.1

2016-02-08 Thread Andrew Ge Wu
Thanks Max

My local and remote environment are running: Scala code runner version 2.11.7 
-- Copyright 2002-2013, LAMP/EPFL
And I downloaded binary 
2.11(//apache.mirrors.spacedump.net/flink/flink-0.10.1/flink-0.10.1-bin-hadoop27-scala_2.11.tgz),
Is there a different version of client lib for scala 2.11?


Best, 

Andrew

> On 08 Feb 2016, at 11:30, Maximilian Michels  wrote:
> 
> Hi Andrew,
> 
> It appears that you're using two different versions of the Scala
> library in your Flink job. Please make sure you use either 2.10 or
> 2.11 but not both at the same time.
> 
> Best,
> Max
> 
> On Mon, Feb 8, 2016 at 10:30 AM, Andrew Ge Wu  wrote:
>> Hi All
>> 
>> I’m new to flink and come to the step to submit to a remote cluster, and it 
>> failed with following message:
>> 
>> Association with remote system [akka.tcp://flink@127.0.0.1:61231] has 
>> failed, address is now gated for [5000] ms. Reason is: [scala.Option; local 
>> class incompatible: stream classdesc serialVersionUID = 
>> -2062608324514658839, local class serialVersionUID = -114498752079829388].
>> 
>> I have doubled checked that my client and server version are the 
>> same(0.10.1), but my java version is a bit different
>> Java(TM) SE Runtime Environment (build 1.8.0_25-b17)
>> vs.
>> Java(TM) SE Runtime Environment (build 1.8.0_60-b27)
>> 
>> Is java the issue or is there anything else i may be missing?
>> 
>> 
>> Many thanks
>> 
>> 
>> Andrew
>> 
>> 
>> --
>> Confidentiality Notice: This e-mail transmission may contain confidential
>> or legally privileged information that is intended only for the individual
>> or entity named in the e-mail address. If you are not the intended
>> recipient, you are hereby notified that any disclosure, copying,
>> distribution, or reliance upon the contents of this e-mail is strictly
>> prohibited and may be unlawful. If you have received this e-mail in error,
>> please notify the sender immediately by return e-mail and delete all copies
>> of this message.


-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Failed to submit 0.10.1

2016-02-08 Thread Andrew Ge Wu
Hi All

I’m new to flink and come to the step to submit to a remote cluster, and it 
failed with following message:

Association with remote system [akka.tcp://flink@127.0.0.1:61231] has failed, 
address is now gated for [5000] ms. Reason is: [scala.Option; local class 
incompatible: stream classdesc serialVersionUID = -2062608324514658839, local 
class serialVersionUID = -114498752079829388].

I have doubled checked that my client and server version are the same(0.10.1), 
but my java version is a bit different
Java(TM) SE Runtime Environment (build 1.8.0_25-b17)
vs.
Java(TM) SE Runtime Environment (build 1.8.0_60-b27)

Is java the issue or is there anything else i may be missing?


Many thanks


Andrew


-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.