Re: Stream job failed after increasing number retained checkpoints

2018-01-09 Thread Piotr Nowojski
Hi,

Increasing akka’s timeouts is rarely a solution for any problems - it either do 
not help, or just mask the issue making it less visible. But yes, it is 
possible to bump the limits: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#distributed-coordination-via-akka
 


I don’t think that state.checkpoints.num-retained was thought to handle such 
large numbers of retained checkpoint so maybe there are some known/unknown 
limitations. Stefan, do you know something in this regard?

Parallel thing to do is that like for any other akka timeout, you should track 
down the root cause of it. This one warning line doesn’t tell much. From where 
does it come from? Client log? Job manager log? Task manager log? Please search 
on the opposite side of the time outing connection for possible root cause of 
the timeout including:
- possible error/exceptions/warnings
- long GC pauses or other blocking operations (possibly long unnatural gaps in 
the logs)
- machine health (CPU usage, disks usage, network connections)

Piotrek

> On 9 Jan 2018, at 16:38, Jose Miguel Tejedor Fernandez 
>  wrote:
> 
> Hello,
> 
> I have several stream jobs running (v. 1.3.1 ) in production which always 
> fails after a fixed period of around 30h after being executing. That's the 
> WARN trace before failing:
> 
> Association with remote system 
> [akka.tcp://fl...@ip-10-1-51-134.cloud-internal.acme.com:39876 
> ] has failed, 
> address is now gated for [5000] ms. Reason: [Association failed with 
> [akka.tcp://fl...@ip-10-1-51-134.cloud-internal.acme.com:39876 
> ]] Caused by: [No 
> response from remote for outbound association. Handshake timed out after 
> [2 ms].
> 
> The main change done in the job configuration was to increase the 
> state.checkpoints.num-retained from 1 to 2880. I am using asynchronous 
> RocksDB to persists to snapshot the state. (I attach some screenshots with 
> the  checkpoint conf from webUI)
> 
> May my assumption be correct that the increase of checkpoints.num-retained is 
> causing the problem? Any known issue regarding this?
> Besides, Is there any way to increase the Akka handshake timeout from the 
> current 2 ms to a higher value? I considered that it may be convenient to 
> increase the timeout to 1 minute instead.
> 
> BR
> 
> 
>  17.35.18.png>



Re: What's the meaning of "Registered `TaskManager` at akka://flink/deadLetters " ?

2018-01-10 Thread Piotr Nowojski
Hi,

Search both job manager and task manager logs for ip address(es) and port(s) 
that have timeouted. First of all make sure that nodes are visible to each 
other using some simple ping. Afterwards please check that those timeouted 
ports are opened and not blocked by some firewall (telnet).

You can search the documentation for the configuration parameters with “port” 
in name:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html 

But note that many of them are random by default.

Piotrek

> On 9 Jan 2018, at 17:56, Reza Samee  wrote:
> 
> 
> I'm running a flink-cluster (a mini one with just one node); but the problem 
> is that my TaskManager can't reach to my JobManager!
> 
> Here are logs from TaskManager
> ...
> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
> (attempt 20, timeout: 30 seconds)
> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
> (attempt 21, timeout: 30 seconds)
> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
> (attempt 22, timeout: 30 seconds)
> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
> (attempt 23, timeout: 30 seconds)
> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
> (attempt 24, timeout: 30 seconds)
> ...
> 
> My "JobManager UI" shows my TaskManager with this Path & ID: 
> "akka://flink/deadLetters" ( in TaskManagers tab)
> And I found these lines in my JobManger stdout:
> 
> Resource Manager associating with leading JobManager 
> Actor[akka://flink/user/jobmanager#-275619168] - leader session null
> TaskManager ResourceID{resourceId='1132cbdaf2d8204e5e42e321e8592754'} has 
> started.
> Registered TaskManager at MY_PRIV_IP (akka://flink/deadLetters) as 
> 7d9568445b4557a74d05a0771a08ad9c. Current number of registered hosts is 1. 
> Current number of alive task slots is 20.
> 
> 
> What's the meaning of these lines? Where should I look for the solution?
> 
> 
> 
> 
> -- 
> رضا سامعی / http://samee.blog.ir 


Re: Custom Partitioning for Keyed Streams

2018-01-10 Thread Piotr Nowojski
Hi,

I don’t think it is possible to enforce scheduling of two keys to different 
nodes, since all of that is based on hashes.

For some cases, doing the pre-aggregation step (initial aggregation done before 
keyBy, which is followed by final aggregation after the keyBy) can be the 
solution for handling a data skew. With pre aggregation, some (most?) of the 
work can be distributed and be done on the source node instead of doing all of 
the heavy lifting on the destination node. It has not been yet merged to the 
Flink code, but it’s entirely a user space code, which you could copy paste 
(and adjust) into your project. Pull request containing pre aggregation is here:
https://github.com/apache/flink/pull/4626 

Please pay attention at the limitations of this code (documented in the java 
doc).

If above code doesn’t work for you for whatever reason, you can also try to 
implement some custom tailored pre aggregation. Like having two keyBy steps, 
where in first you can artificially split A and B keys into couple of smaller 
ones and the second keyBy could merge/squash the results.

Piotrek

> On 9 Jan 2018, at 21:55, Martin, Nick  wrote:
> 
> Have a set of stateful operators that rely on keyed state. There is 
> substantial skew between keys (i.e. there will be 100 messages on keys A and 
> B, and 10 messages each on keys C-J), and key selection assignment is 
> dictated by the needs of my application such that I can’t choose keys in a 
> way that will eliminate the skew. The skew is somewhat predictable (i.e. I 
> know keys A and B will usually get roughly 10x as many messages as the rest) 
> and fairly consistent on different timescales (i.e. counting the messages on 
> each key for 30 seconds would provide a reasonably good guess as to the 
> distribution of messages that will be received over the next 10-20 minutes).
>  
> The problem I’m having is that often the high volume keys (A and B in the 
> example) end up on the same task slot and slow it down, while the low volume 
> ones are distributed across the other operators, leaving them underloaded. I 
> looked into the available physical partitioning functions, but it looks like 
> that functionality is generally incompatible with keyed streams, and I need 
> access to keyed state to do my actual processing. Is there any way I can get 
> better load balancing while using keyed state?
> 
> Notice: This e-mail is intended solely for use of the individual or entity to 
> which it is addressed and may contain information that is proprietary, 
> privileged and/or exempt from disclosure under applicable law. If the reader 
> is not the intended recipient or agent responsible for delivering the message 
> to the intended recipient, you are hereby notified that any dissemination, 
> distribution or copying of this communication is strictly prohibited. This 
> communication may also contain data subject to U.S. export laws. If so, data 
> subject to the International Traffic in Arms Regulation cannot be 
> disseminated, distributed, transferred, or copied, whether incorporated or in 
> its original form, to foreign nationals residing in the U.S. or abroad, 
> absent the express prior approval of the U.S. Department of State. Data 
> subject to the Export Administration Act may not be disseminated, 
> distributed, transferred or copied contrary to U. S. Department of Commerce 
> regulations. If you have received this communication in error, please notify 
> the sender by reply e-mail and destroy the e-mail message and any physical 
> copies made of the communication.
>  Thank you. 
> *



Re: Datastream broadcast with KeyBy

2018-01-10 Thread Piotr Nowojski
Hi,

Could you elaborate what is the problem that you are having? What is the 
exception(s) that you are getting? I have tested such simple example and it’s 
seems to be working as expected:

DataStreamSource input = env.fromElements(1, 2, 3, 4, 5, 1, 2, 3);

DataStreamSource confStream = env.fromElements(42);

input.keyBy(new MyKeySelector()).connect(confStream.broadcast()).process(new 
MyCoProcessFunction()).print();

Thanks, Piotrek

> On 10 Jan 2018, at 10:01, anujk  wrote:
> 
> Currently we have an Flink pipeline running with Data-Src —> KeyBy —>
> ProcessFunction.  State Management (with RocksDB) and Timers are working
> well.
> Now we have to extend this by having another Config Stream which we want to
> broadcast to all process operators. So wanted to connect the Data Stream
> with Config Stream (with Config Stream being broadcast) and use
> CoProcessFunction to handle both streams.
> 
> KeyBy uses Hash based partitioning and also if we write CustomPartitioner it
> can return only one partition (Array of SelectedChannel option as in
> BroadcastPartitioner is not allowed).
> Would have liked this to work —
> dataStream.keyBy().connect(confStream.broadcast()).process(…RichCoProcessFunction()…)
> but it says both stream must be keyed.
> 
> Is there any way to make this work?
> 
> dataStream.connect(confStream.broadcast()).flatMap(...
> RichCoFlatMapFunction() …) ==> broadcast works. But we really want KeyBy and
> processFunction functionality.
> 
> Thanks,
> Anuj
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Stream job failed after increasing number retained checkpoints

2018-01-10 Thread Piotr Nowojski
3dbbe73f4686beda7d).
> 2018-01-08 22:26:43,037 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Sink: Discarded events (4/4) 
> (50b6fc8908a4b13dbbe73f4686beda7d) switched from RUNNING to FAILED.
> java.lang.Exception: TaskManager akka://flink/user/taskmanager disconnects 
> from JobManager 
> akka.tcp://fl...@ip-10-1-51-209.cloud-internal.rovio.com:35341/user/jobmanager
>  
> <http://fl...@ip-10-1-51-209.cloud-internal.rovio.com:35341/user/jobmanager>: 
> JobManager is no longer reachable
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1095)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:311)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:120)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at 
> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:44)
>   at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
>   at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:486)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2018-01-08 22:26:43,069 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Triggering cancellation of task code Sink: Discarded events 
> (4/4) (50b6fc8908a4b13dbbe73f4686beda7d).
> 2018-01-08 22:26:43,087 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Sink: CounterSink (async 
> call completed) (3/4) (b9f2b35e1f9822320cded759c2daea1e).
> 
> 
> José Miguel Tejedor Fernández
> Server developer
> jose.fernan...@rovio.com <mailto:jose.fernan...@rovio.com>
> Rovio Entertainment Ltd.
> Keilaranta 7, FIN - 02150 Espoo, Finland
> www.rovio.com <http://www.rovio.com/>
> 
> 
> On Wed, Jan 10, 2018 at 10:50 AM, Stefan Richter  <mailto:s.rich...@data-artisans.com>> wrote:
> Hi,
> 
> there is no known limitation in the strict sense, but you might run out of 
> dfs space or job manager memory if you keep around a huge number checkpoints. 
> I wonder what reason you might have that you ever want such a huge number of 
> retained checkpoints? Usually keeping one checkpoint should do the job, maybe 
> a couple more if you are very afraid about corruption that goes beyond your 
> DFSs capabilities to handle it. Is there any reason for that or maybe a 
> misconception about increasing the number of retained checkpoints is good for?
> 
> Best,
> Stefan 
> 
> 
>> Am 10.01.2018 um 08:54 schrieb Piotr Nowojski > <mailto:pi...@data-artisans.com>>:
>> 
>> Hi,
>> 
>> Increasing akka’s timeouts is rarely a solution for any problems - it either 
>> do not help, or just mask the issue making it less visible. But yes, it is 
>> possible to bump the limits: 
>> https://ci.apache.org/projects/flink/flink-docs-r

Re: What's the meaning of "Registered `TaskManager` at akka://flink/deadLetters " ?

2018-01-15 Thread Piotr Nowojski
Hi,

Could you post full job manager and task manager logs from startup until the 
first signs of the problem?

Thanks, Piotrek

> On 15 Jan 2018, at 11:21, Reza Samee  wrote:
> 
> Thanks for response; 
> And sorry the passed time.
> 
> The JobManager & TaskManager logged ports are open!
> 
> 
> Is this log OK?
> 2018-01-15 13:40:03,455 INFO  
> org.apache.flink.runtime.webmonitor.JobManagerRetriever   - New leader 
> reachable under akka.tcp://flink@172.16.20.18:6123/user/jobmanager:null 
> <http://flink@172.16.20.18:6123/user/jobmanager:null>.
> 
> When I kill task-manger, the jobmanager logs:
> 2018-01-15 13:32:41,419 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@stage_dbq_1:45532] has failed, address is now gated for 
> [5000] ms. Reason: [Disassociated] 
> 
> But it will not decrement the number of available task-managers!
> and when I start my signle task-manager again, it logs:
> 
> 2018-01-15 13:32:52,753 INFO  
> org.apache.flink.runtime.instance.InstanceManager - Registered 
> TaskManager at ??? (akka://flink/deadLetters) as 
> 626846ae27a833cb094eeeb047a6a72c. Current number of registered hosts is 2. 
> Current number of alive task slots is 40.
> 
> 
> On Wed, Jan 10, 2018 at 11:36 AM, Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Search both job manager and task manager logs for ip address(es) and port(s) 
> that have timeouted. First of all make sure that nodes are visible to each 
> other using some simple ping. Afterwards please check that those timeouted 
> ports are opened and not blocked by some firewall (telnet).
> 
> You can search the documentation for the configuration parameters with “port” 
> in name:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html>
> But note that many of them are random by default.
> 
> Piotrek
> 
>> On 9 Jan 2018, at 17:56, Reza Samee > <mailto:reza.sa...@gmail.com>> wrote:
>> 
>> 
>> I'm running a flink-cluster (a mini one with just one node); but the problem 
>> is that my TaskManager can't reach to my JobManager!
>> 
>> Here are logs from TaskManager
>> ...
>> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
>> <> (attempt 20, timeout: 30 seconds)
>> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
>> <> (attempt 21, timeout: 30 seconds)
>> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
>> <> (attempt 22, timeout: 30 seconds)
>> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
>> <> (attempt 23, timeout: 30 seconds)
>> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
>> <> (attempt 24, timeout: 30 seconds)
>> ...
>> 
>> My "JobManager UI" shows my TaskManager with this Path & ID: 
>> "akka://flink/deadLetters <>" ( in TaskManagers tab)
>> And I found these lines in my JobManger stdout:
>> 
>> Resource Manager associating with leading JobManager 
>> Actor[akka://flink/user/jobmanager#-275619168 <>] - leader session null
>> TaskManager ResourceID{resourceId='1132cbdaf2d8204e5e42e321e8592754'} has 
>> started.
>> Registered TaskManager at MY_PRIV_IP (akka://flink/deadLetters <>) as 
>> 7d9568445b4557a74d05a0771a08ad9c. Current number of registered hosts is 1. 
>> Current number of alive task slots is 20.
>> 
>> 
>> What's the meaning of these lines? Where should I look for the solution?
>> 
>> 
>> 
>> 
>> -- 
>> رضا سامعی / http://samee.blog.ir <http://samee.blog.ir/>
> 
> 
> 
> -- 
> رضا سامعی / http://samee.blog.ir <http://samee.blog.ir/>


Re: What's the meaning of "Registered `TaskManager` at akka://flink/deadLetters " ?

2018-01-19 Thread Piotr Nowojski
Hi,

It seems like you have not opened some of the ports. As I pointed out in the 
first mail, please go through all of the config options regarding 
hostnames/ports (not only those that appear in the log files, maybe something 
is not being logged) 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#jobmanager-amp-taskmanager
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#jobmanager-amp-taskmanager>

jobmanager.rpc.port
taskmanager.rpc.port
taskmanager.data.port
blob.server.port 

And double check that they are accessible from appropriate machines, best by 
using some external tool like telnet and ncat. You network can be configured to 
accept some connections only from specific hosts (like localhost). For example 
in the case for which you attached the log files, did you check that the job 
manager host, can open a connection to the `stage_dbq_1:33633` (task manager 
host and it’s rpc port - rpc port by default is random).

Also make sure that the configurations on the task manager and job manager are 
consistent.

Piotrek

> On 18 Jan 2018, at 08:41, Reza Samee  wrote:
> 
> Hi, 
> 
> I attached log file,
> 
> Thanks
> 
> On Mon, Jan 15, 2018 at 3:36 PM, Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Could you post full job manager and task manager logs from startup until the 
> first signs of the problem?
> 
> Thanks, Piotrek
> 
> 
>> On 15 Jan 2018, at 11:21, Reza Samee > <mailto:reza.sa...@gmail.com>> wrote:
>> 
>> Thanks for response; 
>> And sorry the passed time.
>> 
>> The JobManager & TaskManager logged ports are open!
>> 
>> 
>> Is this log OK?
>> 2018-01-15 13:40:03,455 INFO  
>> org.apache.flink.runtime.webmonitor.JobManagerRetriever   - New leader 
>> reachable under akka.tcp://flink@172.16.20.18:6123/user/jobmanager:null 
>> <http://flink@172.16.20.18:6123/user/jobmanager:null>.
>> 
>> When I kill task-manger, the jobmanager logs:
>> 2018-01-15 13:32:41,419 WARN  akka.remote.ReliableDeliverySupervisor 
>>- Association with remote system 
>> [akka.tcp://flink@stage_dbq_1:45532 <>] has failed, address is now gated for 
>> [5000] ms. Reason: [Disassociated] 
>> 
>> But it will not decrement the number of available task-managers!
>> and when I start my signle task-manager again, it logs:
>> 
>> 2018-01-15 13:32:52,753 INFO  
>> org.apache.flink.runtime.instance.InstanceManager - Registered 
>> TaskManager at ??? (akka://flink/deadLetters <>) as 
>> 626846ae27a833cb094eeeb047a6a72c. Current number of registered hosts is 2. 
>> Current number of alive task slots is 40.
>> 
>> 
>> On Wed, Jan 10, 2018 at 11:36 AM, Piotr Nowojski > <mailto:pi...@data-artisans.com>> wrote:
>> Hi,
>> 
>> Search both job manager and task manager logs for ip address(es) and port(s) 
>> that have timeouted. First of all make sure that nodes are visible to each 
>> other using some simple ping. Afterwards please check that those timeouted 
>> ports are opened and not blocked by some firewall (telnet).
>> 
>> You can search the documentation for the configuration parameters with 
>> “port” in name:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html>
>> But note that many of them are random by default.
>> 
>> Piotrek
>> 
>>> On 9 Jan 2018, at 17:56, Reza Samee >> <mailto:reza.sa...@gmail.com>> wrote:
>>> 
>>> 
>>> I'm running a flink-cluster (a mini one with just one node); but the 
>>> problem is that my TaskManager can't reach to my JobManager!
>>> 
>>> Here are logs from TaskManager
>>> ...
>>> Trying to register at JobManager 
>>> akka.tcp://flink@MY_PRIV_IP/user/jobmanager <> (attempt 20, timeout: 30 
>>> seconds)
>>> Trying to register at JobManager 
>>> akka.tcp://flink@MY_PRIV_IP/user/jobmanager <> (attempt 21, timeout: 30 
>>> seconds)
>>> Trying to register at JobManager 
>>> akka.tcp://flink@MY_PRIV_IP/user/jobmanager <> (attempt 22, timeout: 30 
>>> seconds)
>>> Trying to register at JobManager 
>>> akka.tcp://flink@MY_PRIV_IP/user/jobmanager <> (attempt 23, timeout: 30 
>>> seconds)
>>> Trying to register at JobManager 
>>> akka.tcp://flink@MY_PRIV_IP/user/jobmanager <> (attempt 24, timeout: 30 
>>> seconds)
>>>

Re: Data exchange between tasks (operators/sources) at streaming api runtime

2018-01-25 Thread Piotr Nowojski
Hi,

As far as I know there is currently no simple way to do this:
Join stream with static data in 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
 

and
https://issues.apache.org/jira/browse/FLINK-6131 


One walk around might be to buffer on the state the Kafka input in your 
TwoInput operator until all of the broadcasted messages have arrived.
Another option might be to dynamically start your application. First run some 
computation to determine the fixed lists of ids and start the flink application 
with those values hardcoded in/passed via command line arguments.

Piotrek 

> On 25 Jan 2018, at 04:10, Ishwara Varnasi  wrote:
> 
> Hello,
> I have a scenario where I've two sources, one of them is source of fixed list 
> of ids for preloading (caching certain info which is slow) and second one is 
> the kafka consumer. I need to run Kafka after first one completes. I need a 
> mechanism to let the Kafka consumer know that it can start consuming 
> messages. How can I achieve this?
> thanks
> Ishwara Varnasi



Re: Avoiding deadlock with iterations

2018-01-25 Thread Piotr Nowojski
Hi,

This is a known problem and I don’t think there is an easy solution to this. 
Please refer to the:
http://mail-archives.apache.org/mod_mbox/flink-user/201704.mbox/%3c5486a7fd-41c3-4131-5100-272825088...@gaborhermann.com%3E
 

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132 


Thanks,
Piotrek

> On 25 Jan 2018, at 05:36, Ken Krugler  wrote:
> 
> Hi all,
> 
> We’ve run into deadlocks with two different streaming workflows that have 
> iterations.
> 
> In both cases, the issue is with fan-out; if any operation in the loop can 
> emit more records than consumed, eventually a network buffer fills up, and 
> then everyone in the iteration loop is blocked.
> 
> One pattern we can use, when the operator that’s causing the fan-out has the 
> ability to decide how much to emit, is to have it behave as an async 
> function, emitting from a queue with multiple threads. If threads start 
> blocking because of back pressure, then the queue begins to fill up, and the 
> function can throttle back how much data it queues up. So this gives us a 
> small (carefully managed) data reservoir we can use to avoid the deadlock.
> 
> Is there a better approach? I didn’t see any way to determine how “full” the 
> various network buffers are, and use that for throttling. Plus there’s the 
> issue of partitioning, where it would be impossible in many cases to know the 
> impact of a record being emitted. So even if we could monitor buffers, I 
> don’t think it’s a viable solution.
> 
> Thanks,
> 
> — Ken
> 
> 
> http://about.me/kkrugler 
> +1 530-210-6378
> 



Re: Send ACK when all records of file are processed

2018-01-25 Thread Piotr Nowojski
Hi,

As you figured out, some dummy EOF record is one solution, however you might 
try to achieve it also by wrapping an existing CSV function. Your wrapper could 
emit this dummy EOF record. Another (probably better) idea is to use 
Watermark(Long.MAX_VALUE) for the EOF marker. Stream source and/or 
ContrinousFileReaderOperator will do that for you, so you would just need to 
handle the Watermark.

The question is, do you need to perform the ACK operation AFTER all of the DB 
writes, or just after reading the CSV file? If the latter one, you could add 
some custom ACK operator with parallelism one just after the CSV source that 
waits for the EOF Watermark. 

If it is the first one (some kind of committing the DB writes), you would need 
to to wait until the EOF passes through all of your operators. You would need 
something like that:

parallelism 1 for source -> default parallelism for keyBy/enrichment/db writes 
-> parallelism 1 for ACK operator on Watermark(Long.MAX_VALUE)

I hope this helps,
Piotrek

> On 24 Jan 2018, at 23:19, Vinay Patil  wrote:
> 
> Hi Guys,
> 
> Following is how my pipeline looks (DataStream API) :
> 
> [1] Read the data from the csv file
> [2] KeyBy it by some id
> [3] Do the enrichment and write it to DB
> 
> [1] reads the data in sequence as it has single parallelism and then I have 
> default parallelism for the other operators.
> 
> I want to generate a response (ack) when all the data of the file is 
> processed. How can I achieve this ?
> 
> One solution I can think of is to have EOF dummy record in a file and a 
> unique field for all the records in that file. Doing a keyBy on this field 
> will make sure that all records are sent to a single slot. So, when EOF  
> dummy records is read I can generate a response/ack.
> 
> Is there a better way I can deal with this ?
> 
> 
> Regards,
> Vinay Patil



Re: Data exchange between tasks (operators/sources) at streaming api runtime

2018-01-25 Thread Piotr Nowojski
If you want to go this way, you could:
- as you proposed use some busy waiting with reading some file from a 
distributed file system
- wait for some network message (opening your own socket)
- use some other external system for this purpose: Kafka? Zookeeper?  

Although all of them seems hacky and I would prefer (as I proposed before) to 
pre compute those ids before running/starting the main Flink application. 
Probably would be simpler and easier to maintain.

Piotrek

> On 25 Jan 2018, at 13:47, Ishwara Varnasi  wrote:
> 
> The FLIP-17 is promising. Until it’s available I’m planning to do this: 
> extend Kafka consumer and add logic to hold consuming until other source 
> (fixed set) completes sending and those messages are processed by the 
> application. However the question is to how to let the Kafka consumer know 
> that it should now start consuming messages. What is the correct way to 
> broadcast messages to other tasks at runtime? I’d success with the 
> distributed cache (ie write status to a file in one task and other looks for 
> status in this file), but doesn’t look like good solution although works. 
> Thanks for the pointers.
> Ishwara Varnasi 
> 
> Sent from my iPhone
> 
> On Jan 25, 2018, at 4:03 AM, Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> 
>> Hi,
>> 
>> As far as I know there is currently no simple way to do this:
>> Join stream with static data in 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
>>  
>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API>
>> and
>> https://issues.apache.org/jira/browse/FLINK-6131 
>> <https://issues.apache.org/jira/browse/FLINK-6131>
>> 
>> One walk around might be to buffer on the state the Kafka input in your 
>> TwoInput operator until all of the broadcasted messages have arrived.
>> Another option might be to dynamically start your application. First run 
>> some computation to determine the fixed lists of ids and start the flink 
>> application with those values hardcoded in/passed via command line arguments.
>> 
>> Piotrek 
>> 
>>> On 25 Jan 2018, at 04:10, Ishwara Varnasi >> <mailto:ivarn...@gmail.com>> wrote:
>>> 
>>> Hello,
>>> I have a scenario where I've two sources, one of them is source of fixed 
>>> list of ids for preloading (caching certain info which is slow) and second 
>>> one is the kafka consumer. I need to run Kafka after first one completes. I 
>>> need a mechanism to let the Kafka consumer know that it can start consuming 
>>> messages. How can I achieve this?
>>> thanks
>>> Ishwara Varnasi
>> 



Re: Send ACK when all records of file are processed

2018-01-25 Thread Piotr Nowojski
Hi,

If an operator has multiple inputs, it’s watermark will be the minimum of all 
of the inputs. Thus your hypothetical “ACK Operator” will get 
Watermark(Long.MAX_VALUE) only when of the preceding operators report 
Watermark(Long.MAX_VALUE). 

Yes, instead of simply adding sink, you would have to use something like 
`flatMap`, that doesn’t emit anything, only passes the watermark (default 
implementation are doing exactly that).

To access watermark, you can use DataStream.transform function and pass your 
own implementation of an operator extending from AbstractStreamOperator. 
Probably you would only need to override processWatermark() method and there 
you could do the ACK operation once you get 
org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK.

Piotrek

> On 25 Jan 2018, at 17:56, Vinay Patil  wrote:
> 
> Hi Piotrek,
> 
> Thank you for your detailed answer.
> 
> Yes, I want to generate the ack when all the records of the file are written 
> to DB.
> 
> So to understand what you are saying , we will receive a single EOF watermark 
> value at the ack operator when all the downstream operator process all the 
> records of the file. But what I understand regarding the watermark is each 
> parallel instance of the operator will emit the watermark, so how do I ensure 
> that the EOF is reached  or will I receive only one watermark at the ack 
> operator ?
> 
> 
> So the pipeline topology will look like 
> 
> DataStream  readFileStream = env.readFile()
> 
> readFileStream
>  .transform(// ContrinousFileReaderOperator)
>  .key(0)
>  .map(// encrichment)
>   .addSink(// DB)
> 
>  instead of add sink, should it be a  simple map operator which writes to DB 
> so that we can have a next ack operator which will generate the response.
> 
> Also, how do I get/access the Watermark value in the ack operator ? It will 
> be a simple  map operator, right ?
> 
> 
> 
> 
> 
> Regards,
> Vinay Patil
> 
> On Thu, Jan 25, 2018 at 4:50 AM, Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> As you figured out, some dummy EOF record is one solution, however you might 
> try to achieve it also by wrapping an existing CSV function. Your wrapper 
> could emit this dummy EOF record. Another (probably better) idea is to use 
> Watermark(Long.MAX_VALUE) for the EOF marker. Stream source and/or 
> ContrinousFileReaderOperator will do that for you, so you would just need to 
> handle the Watermark.
> 
> The question is, do you need to perform the ACK operation AFTER all of the DB 
> writes, or just after reading the CSV file? If the latter one, you could add 
> some custom ACK operator with parallelism one just after the CSV source that 
> waits for the EOF Watermark. 
> 
> If it is the first one (some kind of committing the DB writes), you would 
> need to to wait until the EOF passes through all of your operators. You would 
> need something like that:
> 
> parallelism 1 for source -> default parallelism for keyBy/enrichment/db 
> writes -> parallelism 1 for ACK operator on Watermark(Long.MAX_VALUE)
> 
> I hope this helps,
> Piotrek
> 
>> On 24 Jan 2018, at 23:19, Vinay Patil > <mailto:vinay18.pa...@gmail.com>> wrote:
>> 
>> Hi Guys,
>> 
>> Following is how my pipeline looks (DataStream API) :
>> 
>> [1] Read the data from the csv file
>> [2] KeyBy it by some id
>> [3] Do the enrichment and write it to DB
>> 
>> [1] reads the data in sequence as it has single parallelism and then I have 
>> default parallelism for the other operators.
>> 
>> I want to generate a response (ack) when all the data of the file is 
>> processed. How can I achieve this ?
>> 
>> One solution I can think of is to have EOF dummy record in a file and a 
>> unique field for all the records in that file. Doing a keyBy on this field 
>> will make sure that all records are sent to a single slot. So, when EOF  
>> dummy records is read I can generate a response/ack.
>> 
>> Is there a better way I can deal with this ?
>> 
>> 
>> Regards,
>> Vinay Patil
> 
> 



Re: Send ACK when all records of file are processed

2018-01-30 Thread Piotr Nowojski
In case of reading from input files, at the EOF event, readers will send 
Watermark(Long.MAX_VALUE) on all of the output edges and those watermarks will 
be propagated accordingly. So your ACK operator will get 
Watermark(Long.MAX_VALUE) only when it gets it from ALL of it’s input edges.

When reading from Kafka, you do not have an EOF event, so you it would not be 
possible to use this Watermark(Long.MAX_VALUE). In that case you would need to 
emit some dummy EOF record, containing some meta information like filename 
alongside with correctly set event time to a value greater then original even 
read from Kafka which contained the filename to process. You would have to pass 
this EOF dummy record to your EOF operator. There you you would need to create 
some kind of mapping 

fileName -> event time marking EOF

And each time you process EOF record, you add new entry to this mapping. Now 
whenever you process watermarks, you can check for which fileNames does this 
watermark guarantees that file has been processed completely.

However this is more complicated and you would have to handle thins like:
- cleaning up the mapping (avoiding OutOfMemory)
- making sure that watermarks are generated without unnecessary latencies (when 
reading from file, EOF immediately emits Watermark(Long.MAX_VALUE), which might 
not always be the case for Kafka: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission>)

Piotrek

> On 30 Jan 2018, at 15:17, Vinay Patil  wrote:
> 
> Yeh, so this is the current implementation.
> 
> One question regarding the Watermark, since watermark is chosen as minimum 
> value of all of input streams, only one input  stream will have watermark 
> value to LONG.MAX_VALUE which denotes the EOF processing whereas the other 
> streams will not have this value , is my understanding right ? So in this 
> case LONG.MAX_VALUE will always be a greater value than it's input streams. 
> Or the LONG.MAX_VALUE watermark will flow from each input stream ?
> 
> 
> I was thinking of directly reading from Kafka as source in Flink in order to 
> remove the middle layer of independent Kafka Consumer which is triggering 
> Flink job.
> 
> So, the pipeline will be 1. readFrom Kafka -> take the File location -> read 
> using FileReaderOperator
> 
> But in this case how do I determine for which File I have received the 
> LONG.MAX_VALUE, it will get complicated.
> 
> 
> 
> Regards,
> Vinay Patil
> 
> On Tue, Jan 30, 2018 at 1:57 AM, Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Thanks for the clarification :)
> 
> Since you have one Job per an ACK, you can just relay on 
> Watermark(Long.MAX_VALUE) to mark the end of the processing.
> 
> More complicated solution (compared to what I proposed before) would be 
> needed if you had one long living job (for example multiple weeks) and it 
> would need to produce multiple ACKs in different point of time.
> 
> Piotrek
> 
> 
>> On 29 Jan 2018, at 15:43, Vinay Patil > <mailto:vinay18.pa...@gmail.com>> wrote:
>> 
>> Sure, here is the complete design that we have :
>> 
>> File metadata (NFS location of file) is stored in kafka , we are having a 
>> Kafka Consumer (not flink one) which will read from each partition and 
>> trigger a Flink job on cluster. 
>> 
>> The Flink job will then read from a file and do the processing as I 
>> mentioned earlier.
>> 
>> The requirement here is we need to trigger a ACK if the validations for all 
>> the records in a file are successful.
>> 
>> P.S I know we are not using Kafka to its full potential and are just using 
>> it for storing metadata :) 
>> 
>> Regards,
>> Vinay Patil
>> 
>> On Thu, Jan 25, 2018 at 11:57 AM, Piotr Nowojski > <mailto:pi...@data-artisans.com>> wrote:
>> Could you rephrase what is your concern? 
>> 
>> Thanks, Piotrek
>> 
>> 
>>> On 25 Jan 2018, at 18:54, Vinay Patil >> <mailto:vinay18.pa...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> No, to clarify I need to send the ack for each file when it gets processed 
>>> completely and there are multiple files that I am going to read from the 
>>> shared location.
>>> 
>>> Regards,
>>> Vinay Patil
>>> 
>>> On Thu, Jan 25, 2018 at 11:37 AM, Piotr Nowojski >> <mailto:pi...@data-artisans.com>> wrote:
>>>> 
>>>> Yes, make sense. Just looked at the co

Re: Latest version of Kafka

2018-02-02 Thread Piotr Nowojski
Hi,

Flink as for now provides only a connector for Kafka 0.11, which is using 
KafkaClient in 0.11.x version. However you should be able to use it for reading 
to/writing from Kafka 1.0 - Kafka claims (and as far as I know it’s true) that 
Kafka 1.0 is backward compatible with 0.11. 

Piotrek

> On 1 Feb 2018, at 14:46, Marchant, Hayden  wrote:
> 
> What is the newest version of Kafka that is compatible with Flink 1.4.0? I 
> see the last version of Kafka supported is 0.11 , from documentation, but has 
> any testing been done with Kafka 1.0?
> 
> 
> Hayden Marchant
> 



Re: Flink not writing last few elements to disk

2018-02-05 Thread Piotr Nowojski
Hi,

FileProcessMode.PROCESS_CONTINUOUSLY processes the file continuously - the 
stream will not end. 

Simple `writeAsCsv(…)` on the other hand only flushes the output file on a 
stream end (see `OutputFormatSinkFunction`).

You can either use `PROCESS_ONCE` mode or use more advanced data sink:
- BucketingSink
- re-use `writeAsCsv(…)` code by extending OutputFormatSinkFunction and 
implementing `CheckpointedFunction` to flush on snapshots (for at-least-once)
- write your own sink by extending `TwoPhaseCommitSinkFunction` (to support 
`exactly-once`)

Piotrek

> On 2 Feb 2018, at 18:32, geoff halmo  wrote:
> 
> Hi Flink community:
> 
> I am testing Flink but can't write the final(18 or so elements out to disk)
> 
> Setup:
> Using NYC yellow taxi from data 2017-09.csv, I sorted the data on
> pickup_datetime in bash. I am working in event time.
> 
> Skeleton program:
> val ds = senv.readFile(input_format, input_path,
> FileProcessMode.PROCESS_CONTINUOUSLY, 1000)
> 
> ds.flatMap(row => parse(row)
> .assignAscendingTimestamps( _.datetime)
> .timeWindowAll(Time.hours(1))
> .process( new MyProcessAllWIndowFunction() )
> .writeCsv
> 
> Issue:
> The last line is a half line:
> tail -n1 output.csv
> 150655320,2017-09-27T:19:00-4:00[user@computer]
> 
> When I use .print instead of .writeCsv, the last line on console is
> 150682680,2017-09-30T23:00-400[America/New_York],21353



Re: Reduce parallelism without network transfer.

2018-02-05 Thread Piotr Nowojski
Hi,

It should work like this out of the box if you use rescale method:

https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html#physical-partitioning
 


If it will not work, please let us know.

Piotrek

> On 3 Feb 2018, at 04:39, Kien Truong  wrote:
> 
> Hi,
> 
> Assuming that I have a streaming job, using 30 task managers with 4 slot 
> each. I want to change the parallelism of 1 operator from 120 to 30. Are 
> there anyway so that each subtask of this operator get data from 4 upstream 
> subtasks running in the same task manager, thus avoiding network completely ?
> 
> Best regards, 
> Kien
> 
> Sent from TypeApp 


Re: Global window keyBy

2018-02-05 Thread Piotr Nowojski
Hi,

FIRE_AND_PURGE triggers `org.apache.flink.api.common.state.State#clear()` call 
and it "Removes the value mapped under the current key.”. So other keys should 
remain unmodified. 

I hope this solves your problem/question?

Piotrek

> On 4 Feb 2018, at 15:39, miki haiat  wrote:
> 
> Im using trigger   and a  guid in order to key stream .
> 
> I have  some problem to understand how to clear the window .
> 
> FIRE_AND_PURGE   in trigger  will remove the keyd data only ?
> if fire and purge is removing all the data then i need to implement it more 
> like this  example
> 
> https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/windows/DrivingSegments.java
>  
> 
> 
> Evictor is used in order to clear the data by time stamp  but how can i clear 
> the data  by the key  also ?
> 
> 
> thanks ,
> 
> Miki



Re: Getting Key from keyBy() in ProcessFunction

2018-02-05 Thread Piotr Nowojski
I think now it’s not easily possible, however it might be a valid suggestion to 
add `OnTimerContext#getCurrentKey()` method. 

Besides using ValueState as you discussed before, as a some kind of a walk 
around you could copy and modify KeyedProcessOperator to suits your needs, but 
this would be more complicated.

Piotrek

> On 4 Feb 2018, at 20:36, Ken Krugler  wrote:
> 
> Hi Jürgen,
> 
> That makes sense to me.
> 
> Anyone from the Flink team want to comment on (a) if there is a way to get 
> the current key in the timer callback without using an explicit ValueState 
> that’s maintained in the processElement() method, and (b) if not, whether 
> that could be added to the context?
> 
> Thanks,
> 
> — Ken
> 
> 
>> On Feb 4, 2018, at 6:14 AM, Jürgen Thomann > > wrote:
>> 
>> Hi Ken,
>> 
>> thanks for your answer. You're right and I'm doing it already that way. I 
>> just hoped that I could avoid the ValueState (I'm using a MapState as well 
>> already, which does not store the key) and get the key from the provided 
>> Context of the ProcessFunction. This would avoid having the ValueState and 
>> setting it in the processElement just to know the key in the onTimer 
>> function. 
>> In the current way I have to check the ValueState for every element if the 
>> key is already set or just set it every time again the processElement method 
>> is invoked.
>> 
>> Best,
>> Jürgen
>> 
>> On 02.02.2018 18:37, Ken Krugler wrote:
>>> Hi Jürgen,
>>> 
 On Feb 2, 2018, at 6:24 AM, Jürgen Thomann >>> > wrote:
 
 Hi,
 
 I'm currently using a ProcessFunction after a keyBy() and can't find a way 
 to get the key.
>>> 
>>> Doesn’t your keyBy() take a field (position, or name) to use as the key?
>>> 
>>> So then that same field contains the key in the 
>>> ProcessFunction.processElement(in, …) parameter, yes?
>>> 
 I'm currently storing it in a ValueState within processElement
>>> 
>>> If you’re using a ValueState, then there’s one of those for each unique 
>>> key, not one for the operation.
>>> 
>>> I.e. the ValueState for key = “one” is separate from the ValueState for key 
>>> = “two”.
>>> 
>>> You typically store the key in the state so it’s accessible in the onTimer 
>>> method.
>>> 
 and set it all the time, so that I can access it in onTimer(). Is there a 
 better way to get the key? We are using Flink 1.3 at the moment.
>>> 
>>> The ValueState (what you used in processElement) that you’re accessing in 
>>> the onTimer() method is also scoped by the current key.
>>> 
>>> So assuming you stored the key in the state inside of your processElement() 
>>> call, then you should have everything you need.
>>> 
>>> — Ken
>>> 
>>> PS - Check out 
>>> https://www.slideshare.net/dataArtisans/apache-flink-training-datastream-api-processfunction
>>>  
>>> 
> --
> Ken Krugler
> http://www.scaleunlimited.com 
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr



Re: Why does jobmanager running needs slot ?

2018-02-05 Thread Piotr Nowojski
org.apache.flink.runtime.jobmaster.JobMaster#offerSlots is a receiver side of 
an RPC call that is being initiated on the sender side: 
org.apache.flink.runtime.taskexecutor.TaskExecutor#offerSlotsToJobManager.

In other words, JobMasterGateway.offerSlots is called by a TaskManager and it 
is a way how TaskManager is advertising his slots to a JobManager.

Piotrek

> On 5 Feb 2018, at 08:38, mingleizhang  wrote:
> 
> I find some codes in flink does not make sense to me. Like in some classes 
> below
> 
> JobMasterGateway.java has a offerSlots method which means Offers the given 
> slots to the job manager. I was wondering why a jobmanager running should 
> need slots ?
> TaskExecutor.java has a offerSlotsToJobManager method which means offer slots 
> to jobmanager.
> 
> Above both are confuse me. I just know that Task running needs slots which 
> support by a taskManager. Does anyone let me why what does jobmanager needs 
> slots mean ?
> 
> Thanks in advance.
> Rice.
> 
>  
> 
> 
>  



Re: Checkpoint is not triggering as per configuration

2018-02-05 Thread Piotr Nowojski
Hi,

Did you check task manager and job manager logs for any problems?

Piotrek

> On 5 Feb 2018, at 03:19, syed  wrote:
> 
> Hi
> I am new to the flink world, and trying to understand. Currently, I am using
> Flink 1.3.2 on a small cluster of 4 nodes, 
> I have configured checkpoint directory at HDFS, and run streaming word count
> example with my own custom input file of 63M entries, 
> I enabled checkpoint every one second {/env.enableCheckpointing(1000)/}
> 
> The problem I am facing is checkpoint is only triggered once after 1 second,
> but no checkpoint afterwards, I run application for more than 5 minutes, but
> checkpoint history shows only 1 checkpoint triggered and was successful. I
> don't know why checkpoint not triggering after every second?
> Please suggest me what is wrong?
> Thanks in anticipation.
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Why does jobmanager running needs slot ?

2018-02-05 Thread Piotr Nowojski
I might be wrong but I think it is other way around and the naming of this 
method is correct - it does exactly what it says. TaskManager comes with some 
predefined task slots and it is the one that is offering them to a JobManager. 
JobManager can use those slots offers to (later!) schedule tasks. 
(#offerSlotsToJobManager() is being called during TaskManager initialisation).

Piotrek

> On 5 Feb 2018, at 10:44, mingleizhang  wrote:
> 
> Yes. Thanks Piotrek. Of course. So, TaskExecutor#offerSlotsToJobManager 
> sounds confuse to me. It might be better to rename it to 
> requestSlotsFromJobManager. I dont know whether it is sounds OKay for that. I 
> just feel like offerSlotToJobManager sounds strange.. What do you think of 
> this ?
> 
> Rice.
> 
> 
> 
> 
> 
> At 2018-02-05 17:30:32, "Piotr Nowojski"  wrote:
> org.apache.flink.runtime.jobmaster.JobMaster#offerSlots is a receiver side of 
> an RPC call that is being initiated on the sender side: 
> org.apache.flink.runtime.taskexecutor.TaskExecutor#offerSlotsToJobManager.
> 
> In other words, JobMasterGateway.offerSlots is called by a TaskManager and it 
> is a way how TaskManager is advertising his slots to a JobManager.
> 
> Piotrek
> 
>> On 5 Feb 2018, at 08:38, mingleizhang > <mailto:zml13856086...@163.com>> wrote:
>> 
>> I find some codes in flink does not make sense to me. Like in some classes 
>> below
>> 
>> JobMasterGateway.java has a offerSlots method which means Offers the given 
>> slots to the job manager. I was wondering why a jobmanager running should 
>> need slots ?
>> TaskExecutor.java has a offerSlotsToJobManager method which means offer 
>> slots to jobmanager.
>> 
>> Above both are confuse me. I just know that Task running needs slots which 
>> support by a taskManager. Does anyone let me why what does jobmanager needs 
>> slots mean ?
>> 
>> Thanks in advance.
>> Rice.
>> 
>>  
>> 
>> 
>>  
> 
> 
> 
>  



Re: Why does jobmanager running needs slot ?

2018-02-05 Thread Piotr Nowojski
It seems so - but I’m saying this only basing on a annotations when this method 
was added (in the last couple of months). I’m not that much familiar with those 
code parts.

Piotrek

> On 5 Feb 2018, at 10:51, mingleizhang  wrote:
> 
>  Makes sense to me now. Is it a new design at FLIP6 ?
> 
> Rice.
> 
> 
> 
> 
> 
> At 2018-02-05 17:49:05, "Piotr Nowojski"  wrote:
> I might be wrong but I think it is other way around and the naming of this 
> method is correct - it does exactly what it says. TaskManager comes with some 
> predefined task slots and it is the one that is offering them to a 
> JobManager. JobManager can use those slots offers to (later!) schedule tasks. 
> (#offerSlotsToJobManager() is being called during TaskManager initialisation).
> 
> Piotrek
> 
>> On 5 Feb 2018, at 10:44, mingleizhang > <mailto:zml13856086...@163.com>> wrote:
>> 
>> Yes. Thanks Piotrek. Of course. So, TaskExecutor#offerSlotsToJobManager 
>> sounds confuse to me. It might be better to rename it to 
>> requestSlotsFromJobManager. I dont know whether it is sounds OKay for that. 
>> I just feel like offerSlotToJobManager sounds strange.. What do you think of 
>> this ?
>> 
>> Rice.
>> 
>> 
>> 
>> 
>> 
>> At 2018-02-05 17:30:32, "Piotr Nowojski" > <mailto:pi...@data-artisans.com>> wrote:
>> org.apache.flink.runtime.jobmaster.JobMaster#offerSlots is a receiver side 
>> of an RPC call that is being initiated on the sender side: 
>> org.apache.flink.runtime.taskexecutor.TaskExecutor#offerSlotsToJobManager.
>> 
>> In other words, JobMasterGateway.offerSlots is called by a TaskManager and 
>> it is a way how TaskManager is advertising his slots to a JobManager.
>> 
>> Piotrek
>> 
>>> On 5 Feb 2018, at 08:38, mingleizhang >> <mailto:zml13856086...@163.com>> wrote:
>>> 
>>> I find some codes in flink does not make sense to me. Like in some classes 
>>> below
>>> 
>>> JobMasterGateway.java has a offerSlots method which means Offers the given 
>>> slots to the job manager. I was wondering why a jobmanager running should 
>>> need slots ?
>>> TaskExecutor.java has a offerSlotsToJobManager method which means offer 
>>> slots to jobmanager.
>>> 
>>> Above both are confuse me. I just know that Task running needs slots which 
>>> support by a taskManager. Does anyone let me why what does jobmanager needs 
>>> slots mean ?
>>> 
>>> Thanks in advance.
>>> Rice.
>>> 
>>>  
>>> 
>>> 
>>>  
>> 
>> 
>> 
>>  
> 
> 
> 
>  



Re: Why does jobmanager running needs slot ?

2018-02-05 Thread Piotr Nowojski
Yes, but this issue is still a part of the FLIP-6 work.

Piotrek

> On 5 Feb 2018, at 11:01, mingleizhang  wrote:
> 
> I found a website: https://issues.apache.org/jira/browse/FLINK-4360 
> <https://issues.apache.org/jira/browse/FLINK-4360> implemented this before.
> 
> Rice.
> 
> 
> 
> 
> 
> At 2018-02-05 17:56:49, "Piotr Nowojski"  wrote:
> It seems so - but I’m saying this only basing on a annotations when this 
> method was added (in the last couple of months). I’m not that much familiar 
> with those code parts.
> 
> Piotrek
> 
>> On 5 Feb 2018, at 10:51, mingleizhang > <mailto:zml13856086...@163.com>> wrote:
>> 
>>  Makes sense to me now. Is it a new design at FLIP6 ?
>> 
>> Rice.
>> 
>> 
>> 
>> 
>> 
>> At 2018-02-05 17:49:05, "Piotr Nowojski" > <mailto:pi...@data-artisans.com>> wrote:
>> I might be wrong but I think it is other way around and the naming of this 
>> method is correct - it does exactly what it says. TaskManager comes with 
>> some predefined task slots and it is the one that is offering them to a 
>> JobManager. JobManager can use those slots offers to (later!) schedule 
>> tasks. (#offerSlotsToJobManager() is being called during TaskManager 
>> initialisation).
>> 
>> Piotrek
>> 
>>> On 5 Feb 2018, at 10:44, mingleizhang >> <mailto:zml13856086...@163.com>> wrote:
>>> 
>>> Yes. Thanks Piotrek. Of course. So, TaskExecutor#offerSlotsToJobManager 
>>> sounds confuse to me. It might be better to rename it to 
>>> requestSlotsFromJobManager. I dont know whether it is sounds OKay for that. 
>>> I just feel like offerSlotToJobManager sounds strange.. What do you think 
>>> of this ?
>>> 
>>> Rice.
>>> 
>>> 
>>> 
>>> 
>>> 
>>> At 2018-02-05 17:30:32, "Piotr Nowojski" >> <mailto:pi...@data-artisans.com>> wrote:
>>> org.apache.flink.runtime.jobmaster.JobMaster#offerSlots is a receiver side 
>>> of an RPC call that is being initiated on the sender side: 
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor#offerSlotsToJobManager.
>>> 
>>> In other words, JobMasterGateway.offerSlots is called by a TaskManager and 
>>> it is a way how TaskManager is advertising his slots to a JobManager.
>>> 
>>> Piotrek
>>> 
>>>> On 5 Feb 2018, at 08:38, mingleizhang >>> <mailto:zml13856086...@163.com>> wrote:
>>>> 
>>>> I find some codes in flink does not make sense to me. Like in some classes 
>>>> below
>>>> 
>>>> JobMasterGateway.java has a offerSlots method which means Offers the given 
>>>> slots to the job manager. I was wondering why a jobmanager running should 
>>>> need slots ?
>>>> TaskExecutor.java has a offerSlotsToJobManager method which means offer 
>>>> slots to jobmanager.
>>>> 
>>>> Above both are confuse me. I just know that Task running needs slots which 
>>>> support by a taskManager. Does anyone let me why what does jobmanager 
>>>> needs slots mean ?
>>>> 
>>>> Thanks in advance.
>>>> Rice.
>>>> 
>>>>  
>>>> 
>>>> 
>>>>  
>>> 
>>> 
>>> 
>>>  
>> 
>> 
>> 
>>  
> 
> 
> 
>  



Re: Reduce parallelism without network transfer.

2018-02-06 Thread Piotr Nowojski
Hi,

Rebalance is more safe default setting that protects against data skew. And 
even the smallest data skew can create a bottleneck much larger then the 
serialisation/network transfer cost. Especially if one changes the parallelism 
to a value that’s not a result of multiplication or division (like N down to 
N-1). And data skew can be arbitrarily large, while rebalance overhead compare 
to rescale is limited.

Piotrek


> On 6 Feb 2018, at 04:32, Kien Truong  wrote:
> 
> Thanks Piotr, it works.
> May I ask why default behavior when reducing parallelism is rebalance, and 
> not rescale ?
> 
> Regards,
> Kien
> 
> Sent from TypeApp <http://www.typeapp.com/r?b=11979>
> On Feb 5, 2018, at 15:28, Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> It should work like this out of the box if you use rescale method:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html#physical-partitioning
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html#physical-partitioning>
> 
> If it will not work, please let us know.
> 
> Piotrek 
> 
>> On 3 Feb 2018, at 04:39, Kien Truong < duckientru...@gmail.com 
>> <mailto:duckientru...@gmail.com>> wrote:
>> 
>> Hi, 
>> 
>> Assuming that I have a streaming job, using 30 task managers with 4 slot 
>> each. I want to change the parallelism of 1 operator from 120 to 30. Are 
>> there anyway so that each subtask of this operator get data from 4 upstream 
>> subtasks running in the same task manager, thus avoiding network completely 
>> ? 
>> 
>> Best regards, 
>> Kien 
>> 
>> Sent from TypeApp <http://www.typeapp.com/r?b=11979>



Re: Rebalance to subtasks in same TaskManager instance

2018-02-06 Thread Piotr Nowojski
Hi,

Unfortunately I don’t think it’s currently possible in the Flink. Please feel 
free to submit a feature request for it on our JIRA 
https://issues.apache.org/jira/projects/FLINK/summary 


Have you tried out the setup using rebalance? In most cases overhead of 
rebalance over rescale is not that high as one might think.

Piotrek

> On 5 Feb 2018, at 15:16, johannes.barn...@clarivate.com wrote:
> 
> Hi,
> 
> I have a streaming topology with source parallelism of M and a target
> operator parallelism of N.
> For optimum performance I have found that I need to choose M and N
> independently.
> Also, the source subtasks do not all produce the same number of records and
> therefor I have to rebalance to the target operator to get optimum
> throughput.
> 
> The record sizes vary a lot (up to 10MB) but are about 200kB on average.
> 
> Through experimentation using the rescale() operator I have found that
> maximum throughput can be significantly increased if I restrict this
> rebalancing to target subtasks within the same TaskManager instances.
> 
> However I cannot use rescale for this purpose as it does not do a
> rebalancing to all target subtasks in the instance.
> 
> I was hoping to use a custom Partitioner to achieve this but it is not clear
> to me which partition would correspond to which subTask.
> 
> Is there any way currently to achieve this with Flink? 
> 
> If it helps I believe the feature I am hoping to achieve is similar to
> Storm's "Local or shuffle grouping".
> 
> Any help or suggestions will be appreciated.
> Hans
> 
> 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Ceph configuration for checkpoints?

2018-02-13 Thread Piotr Nowojski
Hi,

Have you tried to refer to ceph documentation? 
http://docs.ceph.com/docs/jewel/cephfs/hadoop/ 
 It claims to be:

> a drop-in replacement for the Hadoop File System (HDFS)

So I would first try to configure ceph according to their documentation and try 
to use Flink’s built in Hadoop connector.

Piotrek

> On 12 Feb 2018, at 20:11, Julio Biason  wrote:
> 
> Hello people,
> 
> I'm looking for the configuration options required to use Ceph as a backend 
> for checkpoints.
> 
> So far, I can see that `state.backend.fs.checkpointdir` and 
> `state.checkpoints.dir` should be something like 
> `ceph:///my-checkpoint-bucket`. But where do I define server and acess and 
> secret keys? I couldn't find anything remotely related to that in the docs...
> 
> -- 
> Julio Biason, Sofware Engineer
> AZION  |  Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101   |  Mobile: +55 51  
> 99907 0554



Re: Python and Scala

2018-02-13 Thread Piotr Nowojski
Hi,

1. Flink’s Python Batch API is not complete and it’s not on pair with Scala.
2. As for know there is no Python API for Flink Streaming, however there is 
some ongoing work with that: https://issues.apache.org/jira/browse/FLINK-5886 

3. CEP doesn’t work with Flink Batch, you have to use Flink Streaming for that: 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#dataset-and-datastream
 


Piotrek

> On 13 Feb 2018, at 13:21, Esa Heikkinen  wrote:
> 
>  
> What the difference is to use Python and Scala in Flink ?
> Can I do all the same things with Python and Scala ? For examples CEP with 
> files.



Re: Python and Scala

2018-02-14 Thread Piotr Nowojski
Hi

Scala REPL uses the same code as compiled library so they should work the same.

Piotrek

> On 13 Feb 2018, at 18:32, Esa Heikkinen  wrote:
> 
> Hi
> 
> And what about the differences between Scala REPL and Scala (compiled) ?
> Esa
> 
> Piotr Nowojski kirjoitti 13.2.2018 klo 15:14:
>> Hi,
>> 
>> 1. Flink’s Python Batch API is not complete and it’s not on pair with Scala.
>> 2. As for know there is no Python API for Flink Streaming, however there is 
>> some ongoing work with that: 
>> https://issues.apache.org/jira/browse/FLINK-5886 
>> <https://issues.apache.org/jira/browse/FLINK-5886>
>> 3. CEP doesn’t work with Flink Batch, you have to use Flink Streaming for 
>> that: 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#dataset-and-datastream
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#dataset-and-datastream>
>> 
>> Piotrek
>> 
>>> On 13 Feb 2018, at 13:21, Esa Heikkinen >> <mailto:esa.heikki...@student.tut.fi>> wrote:
>>> 
>>>  
>>> What the difference is to use Python and Scala in Flink ?
>>> Can I do all the same things with Python and Scala ? For examples CEP with 
>>> files.
>> 
> 



Re: Python and Scala

2018-02-14 Thread Piotr Nowojski
Hi,

I have never used it before, but it’s described in the documentation:

https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/scala_shell.html
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/scala_shell.html>

the "Adding external dependencies” part. It should answer to this question.

Piotrek

> On 14 Feb 2018, at 10:15, Esa Heikkinen  wrote:
> 
> Hi
>  
> Good news. Is it way to supply Scala-code from file to REPL ?
>  
> It seems the compiling is too complicated operation.. Actually I don’t get it 
> to work yet.
>  
> Esa
>  
> From: Piotr Nowojski [mailto:pi...@data-artisans.com] 
> Sent: Wednesday, February 14, 2018 10:55 AM
> To: Esa Heikkinen 
> Cc: Esa Heikkinen ; user@flink.apache.org
> Subject: Re: Python and Scala
>  
> Hi
>  
> Scala REPL uses the same code as compiled library so they should work the 
> same.
>  
> Piotrek
> 
> 
> On 13 Feb 2018, at 18:32, Esa Heikkinen  <mailto:heikk...@student.tut.fi>> wrote:
>  
> Hi
> And what about the differences between Scala REPL and Scala (compiled) ?
> Esa
> 
> Piotr Nowojski kirjoitti 13.2.2018 klo 15:14:
> Hi, 
>  
> 1. Flink’s Python Batch API is not complete and it’s not on pair with Scala.
> 2. As for know there is no Python API for Flink Streaming, however there is 
> some ongoing work with that: https://issues.apache.org/jira/browse/FLINK-5886 
> <https://issues.apache.org/jira/browse/FLINK-5886>
> 3. CEP doesn’t work with Flink Batch, you have to use Flink Streaming for 
> that: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#dataset-and-datastream
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#dataset-and-datastream>
>  
> Piotrek
> 
> 
> On 13 Feb 2018, at 13:21, Esa Heikkinen  <mailto:esa.heikki...@student.tut.fi>> wrote:
>  
>  
> What the difference is to use Python and Scala in Flink ?
> Can I do all the same things with Python and Scala ? For examples CEP with 
> files.



Re: Slow Flink program

2018-03-01 Thread Piotr Nowojski
Hi,

First of all learn about what’s going with your job: check the status of the 
machines, cpu/network usage on the cluster. If CPU is not ~100%, analyse what 
is preventing the machines to work faster (network bottleneck, locking, 
blocking operations etc). If CPU is ~100%, profile the TaskManagers to see what 
can you speed up.

In your example couple of questions:
- you create CollectiveData instances with size 128000 by default. Doesn’t it 
mean that your records are gigantic? I can not tell, since you didn’t provide 
full code.
- you are mapping the data to new Tuple2(0, s);  and 
then keying by the first field, which is always 0. Probably all of the records 
are ending up on one single machine 

Piotrek

> On 28 Feb 2018, at 17:20, Supun Kamburugamuve  wrote:
> 
> Hi, 
> 
> I'm trying to run a simple benchmark on Flink streaming reduce. It seems it 
> is very slow. Could you let me know if I'm doing something wrong.
> 
> Here is the program. I'm running this on 32 nodes with 20 tasks in each node. 
> So the parallelism is at 640.
> 
> public class StreamingReduce {
>   int size;
>   int iterations;
>   StreamExecutionEnvironment env;
>   String outFile;
> 
>   public StreamingReduce(int size, int iterations, StreamExecutionEnvironment 
> env, String outFile) {
> this.size = size;
> this.iterations = iterations;
> this.env = env;
> this.outFile = outFile;
>   }
> 
>   public void execute() {
> DataStream stringStream = env.addSource(new 
> RichParallelSourceFunction() {
>   int i = 1;
>   int count = 0;
>   int size = 0;
>   int iterations = 1;
> 
>   @Override
>   public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> ParameterTool p = (ParameterTool)
> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
> size = p.getInt("size", 128000);
> iterations = p.getInt("itr", 1);
> System.out.println(" iterations: " + iterations + " size: " + 
> size);
>   }
> 
>   @Override
>   public void run(SourceContext sourceContext) throws 
> Exception {
> while (count < iterations) {
>   CollectiveData i = new CollectiveData(size);
>   sourceContext.collect(i);
>   count++;
> }
>   }
> 
>   @Override
>   public void cancel() {
>   }
> });
> 
> stringStream.map(new RichMapFunction CollectiveData>>() {
>   @Override
>   public Tuple2 map(CollectiveData s) throws 
> Exception {
> return new Tuple2(0, s);
>   }
> }).keyBy(0).reduce(new ReduceFunction>() {
>   @Override
>   public Tuple2 reduce(Tuple2 CollectiveData> c1,
> Tuple2 CollectiveData> c2) throws Exception {
> return new Tuple2(0, add(c1.f1, c2.f1));
>   }
> }).addSink(new RichSinkFunction>() {
>   long start;
>   int count = 0;
>   int iterations;
> 
>   @Override
>   public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> ParameterTool p = (ParameterTool)
> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
> iterations = p.getInt("itr", 1);
> System.out.println(" iterations: " + iterations);
>   }
> 
>   @Override
>   public void invoke(Tuple2 integerStringTuple2) 
> throws Exception {
> if (count == 0) {
>   start = System.nanoTime();
> }
> count++;
> if (count >= iterations) {
>   System.out.println("Final: " + count + " " + (System.nanoTime() - 
> start) / 100 + " " + (integerStringTuple2.f1));
> }
>   }
> });
> 
>   }
> 
>   private static CollectiveData add(CollectiveData i, CollectiveData j) {
> List r= new ArrayList<>();
> for (int k = 0; k < i.getList().size(); k++) {
>   r.add((i.getList().get(k) + j.getList().get(k)));
> }
> return new CollectiveData(r);
>   }
> }
> Thanks,
> Supun..
> 
> 



Re: Hi Flink Team

2018-03-01 Thread Piotr Nowojski
Hi,

timeWindowAll is a non parallel operation, since it gathers all of the elements 
and process them together:

https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#timeWindowAll-org.apache.flink.streaming.api.windowing.time.Time-org.apache.flink.streaming.api.windowing.time.Time-
 


Note that it’s defined in DataStream, not in the KeyedStream.

In your keyBy example keyBy() is just a NoOp. Didn’t you mean to use 
KeyedStream#timeWindows method?

Piotrek

> On 1 Mar 2018, at 09:21, Ashish Attarde  wrote:
> 
> Hi,
> 
> I am new to Flink and in general data processing using stream processors.
> 
> I am using flink to do real time correlation between multiple records which 
> are coming as part of same stream. I am doing is "apply" operation on 
> TimeWindowed stream. When I submit job with parallelism factor of 4, I am 
> still seeing apply operation is applied with parallelism factor of 1.
> 
> Here is the peice of code :
> 
> parsedInput.keyBy("mflowHash")
> .timeWindowAll(Time.milliseconds(1000),Time.milliseconds(200))
> .allowedLateness(Time.seconds(10))
> .apply(new CRWindow());
> 
> I am trying to correlate 2 streams, what is the right way to do it? I tried 
> the CEP library and experienced the worst performance. It is taking ~4 
> minutes to do the correlation. The corelation logic is very simple and not 
> compute intensive.
> 
> 
> -- 
> 
> Thanks
> -Ashish Attarde
> 
> 
> 
> -- 
> 
> Thanks
> -Ashish Attarde



Re: Does Flink support Hadoop (HDFS) 2.9 ?

2018-03-01 Thread Piotr Nowojski
Hi,

You can build Flink against Hadoop 2.9:
https://issues.apache.org/jira/browse/FLINK-8177 


It seems like convenience binaries will be built by us only since 1.5:
https://issues.apache.org/jira/browse/FLINK-8363 
 

Piotrek

> On 1 Mar 2018, at 18:13, Soheil Pourbafrani  wrote:
> 
> I mean Flink 1.4
> 
> On Thursday, March 1, 2018, Soheil Pourbafrani  > wrote:
> > ?



Re: Emulate Tumbling window in Event Time Space

2018-03-09 Thread Piotr Nowojski
Hi,

As Xingcan responded, you could use already built in operator for that. 

If you really want to implement something on your own (need custom feature? For 
fun?), you would have to implement some variation of a InternalTimerService 
from Flink (you can browse the code for an inspiration). On each processed 
element you have to keep updating state of your in memory/in state windows with 
timestamps marking when they should be triggered. Then on each processed 
watermark in your operator you need to trigger/fire windows matching to the 
processed watermark.

Piotrek

> On 9 Mar 2018, at 07:50, Xingcan Cui  wrote:
> 
> Hi Dhruv,
> 
> there’s no need to implement the window logic with the low-level 
> `ProcessFunction` yourself. Flink has provided built-in window operators and 
> you just need to implement the `WindowFunction` for that [1].
> 
> Best,
> Xingcan
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#window-functions
>  
> 
> 
>> On 9 Mar 2018, at 1:51 PM, Dhruv Kumar > > wrote:
>> 
>> Hi
>> 
>> I was trying to emulate tumbling window in event time space. Here 
>> 
>>  is the link to my code.
>> I am using the process function to do the custom processing which I want to 
>> do within every window. I am having an issue of how to emit results at the 
>> end of every window since my watermark only gets emitted at every incoming 
>> event (incoming event will mostly not intersect with the end time of any 
>> window). Seems like I need to add a trigger somewhere which fires at the end 
>> of every window. Could any one here help me? Sorry, if I am not clear in 
>> anything. I am quite new to Flink. 
>> 
>> Thanks
>> Dhruv
> 



Re: "Close()" aborts last transaction in TwoPhaseCommitSinkFunction

2018-03-09 Thread Piotr Nowojski
Hi,

Short answer is: no, at the moment clean shutdown is not implemented for the 
streaming, but it’s on our to do list for the future.

Hacky answer: you could implement some custom code, that would wait for at 
least one completed checkpoint after the last input data. But that would 
require modifying a source function or at least wrapping it and there might be 
some corner cases that I haven’t thought about.

Piotrek

> On 9 Mar 2018, at 14:49, Niels van Kaam  wrote:
> 
> Hi,
> 
> I'm working on a custom implementation of a sink which I would like to use 
> with exactly once semantics. Therefore I have implemented the 
> TwoPhaseCommitSinkFunction class as mentioned in this recent post: 
> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
>  
> 
> 
> I have some integration tests which run jobs using the custom sink with a 
> finite dataset (A RichSourceFunction with a "finite" run method). The tests 
> fail because of missing data. I noticed that is due to the last transaction 
> being aborted.
> 
> When looking into the source code that makes sense because the close() 
> implementation of TwoPhaseCommitSinkFunction calls abort on the current 
> transaction: 
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
>  
> 
>  
> 
> I could override this behaviour and perform a commit, but then I would 
> perform a commit without getting the checkpoint completed notification, thus 
> not properly maintaining exactly once guarantees
> 
> Is (and how is) it possible to have end-to-end exactly once guarantees when 
> dealing with (sometimes) finite jobs?
> 
> Thanks!
> Niels
> 



Re: Extremely large job serialization produced by union operator

2018-03-09 Thread Piotr Nowojski
Hi,

Could you provide more details about your queries and setup? Logs could be 
helpful as well.

Piotrek

> On 9 Mar 2018, at 11:00, 杨力  wrote:
> 
> I wrote a flink-sql app with following topography.
> 
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
> ...
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
> 
> I have a dozen of TableSources And tens of SQLs. As a result, the number of 
> JDBCAppendTableSink times parallelism, that is the number of concurrent 
> connections to database, is too large for the database server to handle. So I 
> tried union DataStreams before connecting them to the TableSink.
> 
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map
> \
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union -> 
> JDBCAppendTableSink
> ... /
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map
> 
> With this strategy, job submission failed with an OversizedPayloadException 
> of 104 MB. Increasing akka.framesize helps to avoid this exception, but job 
> submission hangs and times out.
> 
> I can't understand why a simple union operator would serialize to such a 
> large message. Can I avoid this problem?
> Or can I change some configuration to fix the submission time out?
> 
> Regards,
> Bill



Re: Too many open files on Bucketing sink

2018-03-15 Thread Piotr Nowojski
Hi,

There is an open similar issue: 
https://issues.apache.org/jira/browse/FLINK-8707 


It’s still under investigation and it would be helpful if you could follow up 
the discussion there, run same diagnostics commands as Alexander Gardner did 
(mainly if you could attach output of lsof command for TaskManagers).

Last time I was looking into it, most of the open files came from loading 
dependency jars for the operators. It seemed like each task/task slot was 
executed in separate class loader so the same dependency was being loaded 
multiple times over and over again.

Thanks, Piotrek

> On 14 Mar 2018, at 19:52, Felix Cheung  wrote:
> 
> I have seen this before as well.
> 
> My workaround was to limit the number of parallelism but it is the 
> unfortunate effect of limiting the number of processing tasks also (and so 
> slowing things down)
> 
> Another alternative is to have bigger buckets (and smaller number of buckets)
> 
> Not sure if there is a good solution.
> 
> From: galantaa 
> Sent: Tuesday, March 13, 2018 7:08:01 AM
> To: user@flink.apache.org
> Subject: Too many open files on Bucketing sink
>  
> Hey all,
> I'm using bucketing sink with a bucketer that creates partition per customer
> per day.
> I sink the files to s3.
> it suppose to work on around 500 files at the same time (according to my
> partitioning).
> 
> I have a critical problem of 'Too many open files'.
> I've upload two taskmanagers, each with 16 slots. I've checked how many open
> files (or file descriptors) exist with 'lsof | wc -l' and it had reached
> over a million files on each taskmanager!
> 
> after that, I'd decreased the num of taskSlots to 8 (4 in each taskmanager),
> and the concurrency dropped.
> checking 'lsof | wc -l' gave around 250k file on each machine. 
> I also checked how many actual files exist in my tmp dir (it works on the
> files there before uploading them to s3) - around 3000.
> 
> I think that each taskSlot works with several threads (maybe 16?), and each
> thread holds a fd for the actual file, and thats how the numbers get so
> high.
> 
> Is that a know problem? is there anything I can do?
> by now, I filter just 10 customers and it works great, but I have to find a
> real solution so I can stream all the data.
> Maybe I can also work with a single task Slot per machine but I'm not sure
> this is a good idea.
> 
> Thank you very much,
> Alon 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> 


Re: Checkpoint is not triggering as per configuration

2018-03-19 Thread Piotr Nowojski
Hi,

Please analyse what was going on the TaskManager and JobManager before this 
“task is not being executed at the moment”. What is the reason why it is not 
being executed? Was there some exception? Depending on your setup, you might 
need to check your stdout/stderr files (if your code is printing some errors).

Other issue might be if your operators/functions are initialising very slowly 
or being stuck somewhere.

Thanks, Piotrek

> On 19 Mar 2018, at 10:14, ms110400027 Syed Muhammad Abrar Akber 
>  wrote:
> 
> Dear Piotrek;
> The log for task manager shows the following message
> 2018-03-19 17:07:58,000 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
> triggering task Source: Custom File Source (1/1) is not being executed at the 
> moment. Aborting checkpoint.
> I don't know how to fix this issue.  I will highly appreciate your support, 
> if you help me in fixing the issue.
> 
> 
> Further, please guide me where I can find the resources which are helpful for 
> beginners like me to fix such issues.
> Thank you for your support.
> 
> Regards;
> Syed Muhamamd Abrar Akber
> MS110400027
> 
> On Mon, Feb 5, 2018 at 5:33 PM, Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Did you check task manager and job manager logs for any problems?
> 
> Piotrek
> 
> > On 5 Feb 2018, at 03:19, syed  > <mailto:ms110400...@vu.edu.pk>> wrote:
> >
> > Hi
> > I am new to the flink world, and trying to understand. Currently, I am using
> > Flink 1.3.2 on a small cluster of 4 nodes,
> > I have configured checkpoint directory at HDFS, and run streaming word count
> > example with my own custom input file of 63M entries,
> > I enabled checkpoint every one second {/env.enableCheckpointing(1000)/}
> >
> > The problem I am facing is checkpoint is only triggered once after 1 second,
> > but no checkpoint afterwards, I run application for more than 5 minutes, but
> > checkpoint history shows only 1 checkpoint triggered and was successful. I
> > don't know why checkpoint not triggering after every second?
> > Please suggest me what is wrong?
> > Thanks in anticipation.
> >
> >
> >
> >
> > --
> > Sent from: 
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
> 
> 



Re: Kafka ProducerFencedException after checkpointing

2018-03-20 Thread Piotr Nowojski
Hi,

What’s your Kafka’s transaction timeout setting? Please both check Kafka 
producer configuration (transaction.timeout.ms property) and Kafka broker 
configuration. The most likely cause of such error message is when Kafka's 
timeout is smaller then Flink’s checkpoint interval and transactions are not 
committed quickly enough before timeout occurring.

Piotrek

> On 17 Mar 2018, at 07:24, Dongwon Kim  wrote:
> 
> 
> Hi,
> 
> I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, 
> 7th, ... checkpoints:
> --
> java.lang.RuntimeException: Error while confirming checkpoint
>   at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
> attempted an operation with an old epoch. Either there is a newer producer 
> with the same transactionalId, or the producer's transaction has been expired 
> by the broker.
> --
> 
> FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing 
> using Kafka sink.
> We use FsStateBackend to store snapshot data on HDFS.
> 
> As shown in configuration.png, my checkpoint configuration is:
> - Checkpointing Mode : Exactly Once
> - Interval : 15m 0s
> - Timeout : 10m 0s
> - Minimum Pause Between Checkpoints : 5m 0s
> - Maximum Concurrent Checkpoints : 1
> - Persist Checkpoints Externally : Disabled
> 
> After the first checkpoint completed [see history after 1st ckpt.png], the 
> job is restarted due to the ProducerFencedException [see exception after 1st 
> ckpt.png].
> The first checkpoint takes less than 2 minutes while my checkpoint interval 
> is 15m and minimum pause between checkpoints is 5m.
> After the job is restarted, the second checkpoint is triggered after a while 
> [see history after 2nd ckpt.png] and this time I've got no exception.
> The third checkpoint results in the same exception as after the first 
> checkpoint.
> 
> Can anybody let me know what's going wrong behind the scene?
> 
> Best,
> 
> Dongwon
>  ckpt.png> ckpt.png>



Re: Kafka ProducerFencedException after checkpointing

2018-03-20 Thread Piotr Nowojski
Hi,

Please increase transaction.timeout.ms to a greater value or decrease Flink’s 
checkpoint interval, I’m pretty sure the issue here is that those two values 
are overlapping. I think that’s even visible on the screenshots. First 
checkpoint completed started at 14:28:48 and ended at 14:30:43, while the 
second one started at 14:45:53 and ended at 14:49:16. That gives you minimal 
transaction duration of 15 minutes and 10 seconds, with maximal transaction 
duration of 21 minutes.

In HAPPY SCENARIO (without any failure and restarting), you should assume that 
your timeout interval should cover with some safety margin the period between 
start of a checkpoint and end of the NEXT checkpoint, since this is the upper 
bound how long the transaction might be used. In your case at least ~25 minutes.

On top of that, as described in the docs, 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance>
 , in case of failure, your timeout must also be able to cover the additional 
downtime required for the successful job restart. Thus you should increase your 
timeout accordingly. 

Piotrek


> On 20 Mar 2018, at 11:58, Dongwon Kim  wrote:
> 
> Hi Piotr,
> 
> We have set producer's [transaction.timeout.ms 
> <http://transaction.timeout.ms/>] to 15 minutes and have used the default 
> setting for broker (15 mins).
> As Flink's checkpoint interval is 15 minutes, it is not a situation where 
> Kafka's timeout is smaller than Flink's checkpoint interval.
> As our first checkpoint just takes 2 minutes, it seems like transaction is 
> not committed properly.
> 
> Best,
> 
> - Dongwon
> 
> 
> 
> 
> 
> On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> What’s your Kafka’s transaction timeout setting? Please both check Kafka 
> producer configuration (transaction.timeout.ms 
> <http://transaction.timeout.ms/> property) and Kafka broker configuration. 
> The most likely cause of such error message is when Kafka's timeout is 
> smaller then Flink’s checkpoint interval and transactions are not committed 
> quickly enough before timeout occurring.
> 
> Piotrek
> 
>> On 17 Mar 2018, at 07:24, Dongwon Kim > <mailto:eastcirc...@gmail.com>> wrote:
>> 
>> 
>> Hi,
>> 
>> I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, 
>> 7th, ... checkpoints:
>> --
>> java.lang.RuntimeException: Error while confirming checkpoint
>>  at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>>  at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>  at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
>> attempted an operation with an old epoch. Either there is a newer producer 
>> with the same transactionalId, or the producer's transaction has been 
>> expired by the broker.
>> --
>> 
>> FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing 
>> using Kafka sink.
>> We use FsStateBackend to store snapshot data on HDFS.
>> 
>> As shown in configuration.png, my checkpoint configuration is:
>> - Checkpointing Mode : Exactly Once
>> - Interval : 15m 0s
>> - Timeout : 10m 0s
>> - Minimum Pause Between Checkpoints : 5m 0s
>> - Maximum Concurrent Checkpoints : 1
>> - Persist Checkpoints Externally : Disabled
>> 
>> After the first checkpoint completed [see history after 1st ckpt.png], the 
>> job is restarted due to the ProducerFencedException [see exception after 1st 
>> ckpt.png].
>> The first checkpoint takes less than 2 minutes while my checkpoint interval 
>> is 15m and minimum pause between checkpoints is 5m.
>> After the job is restarted, the second checkpoint is triggered after a while 
>> [see history after 2nd ckpt.png] and this time I've got no exception.
>> The third checkpoint results in the same exception as after the first 
>> checkpoint.
>> 
>> Can anybody let me know what's going wrong behind the scene?
>> 
>> Best,
>> 
>> Dongwon
>> > ckpt.png>> ckpt.png>
> 
> 



Re: Kafka ProducerFencedException after checkpointing

2018-03-21 Thread Piotr Nowojski
Hi,

But that’s exactly the case: producer’s transaction timeout starts when the 
external transaction starts - but FlinkKafkaProducer011 keeps an active Kafka 
transaction for the whole period between checkpoints.

As I wrote in the previous message:

> in case of failure, your timeout must also be able to cover the additional 
> downtime required for the successful job restart. Thus you should increase 
> your timeout accordingly.

I think that 15 minutes timeout is a way too small value. If your job fails 
because of some intermittent failure (for example worker crash/restart), you 
will only have a couple of minutes for a successful Flink job restart. 
Otherwise you will lose some data (because of the transaction timeouts).

Piotrek

> On 21 Mar 2018, at 10:30, Dongwon Kim  wrote:
> 
> Hi Piotr,
> 
> Now my streaming pipeline is working without retries. 
> I decreased Flink's checkpoint interval from 15min to 10min as you suggested 
> [see screenshot_10min_ckpt.png].
> 
> I though that producer's transaction timeout starts when the external 
> transaction starts.
> The truth is that Producer's transaction timeout starts after the last 
> external checkpoint is committed.
> Now that I have 15min for Producer's transaction timeout and 10min for 
> Flink's checkpoint interval, and every checkpoint takes less than 5 minutes, 
> everything is working fine.
> Am I right?
> 
> Anyway thank you very much for the detailed explanation!
> 
> Best,
> 
> Dongwon
> 
> 
> 
> On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Please increase transaction.timeout.ms <http://transaction.timeout.ms/> to a 
> greater value or decrease Flink’s checkpoint interval, I’m pretty sure the 
> issue here is that those two values are overlapping. I think that’s even 
> visible on the screenshots. First checkpoint completed started at 14:28:48 
> and ended at 14:30:43, while the second one started at 14:45:53 and ended at 
> 14:49:16. That gives you minimal transaction duration of 15 minutes and 10 
> seconds, with maximal transaction duration of 21 minutes.
> 
> In HAPPY SCENARIO (without any failure and restarting), you should assume 
> that your timeout interval should cover with some safety margin the period 
> between start of a checkpoint and end of the NEXT checkpoint, since this is 
> the upper bound how long the transaction might be used. In your case at least 
> ~25 minutes.
> 
> On top of that, as described in the docs, 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance>
>  , in case of failure, your timeout must also be able to cover the additional 
> downtime required for the successful job restart. Thus you should increase 
> your timeout accordingly. 
> 
> Piotrek
> 
> 
>> On 20 Mar 2018, at 11:58, Dongwon Kim > <mailto:eastcirc...@gmail.com>> wrote:
>> 
>> Hi Piotr,
>> 
>> We have set producer's [transaction.timeout.ms 
>> <http://transaction.timeout.ms/>] to 15 minutes and have used the default 
>> setting for broker (15 mins).
>> As Flink's checkpoint interval is 15 minutes, it is not a situation where 
>> Kafka's timeout is smaller than Flink's checkpoint interval.
>> As our first checkpoint just takes 2 minutes, it seems like transaction is 
>> not committed properly.
>> 
>> Best,
>> 
>> - Dongwon
>> 
>> 
>> 
>> 
>> 
>> On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski > <mailto:pi...@data-artisans.com>> wrote:
>> Hi,
>> 
>> What’s your Kafka’s transaction timeout setting? Please both check Kafka 
>> producer configuration (transaction.timeout.ms 
>> <http://transaction.timeout.ms/> property) and Kafka broker configuration. 
>> The most likely cause of such error message is when Kafka's timeout is 
>> smaller then Flink’s checkpoint interval and transactions are not committed 
>> quickly enough before timeout occurring.
>> 
>> Piotrek
>> 
>>> On 17 Mar 2018, at 07:24, Dongwon Kim >> <mailto:eastcirc...@gmail.com>> wrote:
>>> 
>>> 
>>> Hi,
>>> 
>>> I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, 
>>> 7th, ... checkpoints:
>>> --
>>> java.lang.RuntimeException: Error while confirming checkpoint
>>> at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260

Re: Standalone cluster instability

2018-03-21 Thread Piotr Nowojski
Hi,

Does the issue really happen after 48 hours? 
Is there some indication of a failure in TaskManager log?

If you will be still unable to solve the problem, please provide full 
TaskManager and JobManager logs.

Piotrek

> On 21 Mar 2018, at 16:00, Alexander Smirnov  
> wrote:
> 
> One more question - I see a lot of line like the following in the logs
> 
> [2018-03-21 00:30:35,975] ERROR Association to 
> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:35320 
> ] with UID [1500204560] 
> irrecoverably failed. Quarantining address. (akka.remote.Remoting)
> [2018-03-21 00:34:15,208] WARN Association to 
> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:41068 
> ] with unknown UID is 
> irrecoverably failed. Address cannot be quarantined without knowing the UID, 
> gating instead for 5000 ms. (akka.remote.Remoting)
> [2018-03-21 00:34:15,235] WARN Association to 
> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:40677 
> ] with unknown UID is 
> irrecoverably failed. Address cannot be quarantined without knowing the UID, 
> gating instead for 5000 ms. (akka.remote.Remoting)
> [2018-03-21 00:34:15,256] WARN Association to 
> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:40382 
> ] with unknown UID is 
> irrecoverably failed. Address cannot be quarantined without knowing the UID, 
> gating instead for 5000 ms. (akka.remote.Remoting)
> [2018-03-21 00:34:15,256] WARN Association to 
> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:44744 
> ] with unknown UID is 
> irrecoverably failed. Address cannot be quarantined without knowing the UID, 
> gating instead for 5000 ms. (akka.remote.Remoting)
> [2018-03-21 00:34:15,266] WARN Association to 
> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:42413 
> ] with unknown UID is 
> irrecoverably failed. Address cannot be quarantined without knowing the UID, 
> gating instead for 5000 ms. (akka.remote.Remoting)
> 
> 
> The host is available, but I don't understand where port number comes from. 
> Task Manager uses another port (which is printed in logs on startup)
> Could you please help to understand why it happens?
> 
> Thank you,
> Alex
> 
> 
> On Wed, Mar 21, 2018 at 4:19 PM Alexander Smirnov 
> mailto:alexander.smirn...@gmail.com>> wrote:
> Hello,
> 
> I've assembled a standalone cluster of 3 task managers and 3 job managers(and 
> 3 ZK) following the instructions at 
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/cluster_setup.html
>  
> 
>  and 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/jobmanager_high_availability.html
>  
> 
> 
> It works ok, but randomly, task managers becomes unavailable. JobManager has 
> exception like below in logs:
> 
> 
> [2018-03-19 00:33:10,211] WARN Association with remote system 
> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:42413 
> ] has failed, address is 
> now gated for [5000] ms. Reason: [Association failed with 
> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:42413 
> ]] Caused by: [Connection 
> refused: qafdsflinkw811.nn.five9lab.com/10.5.61.124:42413 
> ] 
> (akka.remote.ReliableDeliverySupervisor)
> [2018-03-21 00:30:35,975] ERROR Association to 
> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:35320 
> ] with UID [1500204560] 
> irrecoverably failed. Quarantining address. (akka.remote.Remoting)
> java.util.concurrent.TimeoutException: Remote system has been silent for too 
> long. (more than 48.0 hours)
> at 
> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at 
> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  

Re: Error running on Hadoop 2.7

2018-03-21 Thread Piotr Nowojski
Hi,

Have you replaced all of your old Flink binaries with freshly downloaded 
 Hadoop 2.7 versions? Are you sure 
that something hasn't mix in the process?

Does some simple word count example works on the cluster after the upgrade?

Piotrek

> On 21 Mar 2018, at 16:11, ashish pok  wrote:
> 
> Hi All,
> 
> We ran into a roadblock in our new Hadoop environment, migrating from 2.6 to 
> 2.7. It was supposed to be an easy lift to get a YARN session but doesnt seem 
> like :) We definitely are using 2.7 binaries but it looks like there is a 
> call here to a private methos which screams runtime incompatibility. 
> 
> Anyone has seen this and have pointers?
> 
> Thanks, Ashish
> Exception in thread "main" java.lang.IllegalAccessError: tried to access 
> method 
> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object;
>  from class 
> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider <>
> at 
> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:75)
> at 
> org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:163)
> at 
> org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94)
> at 
> org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)
> at 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceStart(YarnClientImpl.java:187)
> at 
> org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
> at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.getYarnClient(AbstractYarnClusterDescriptor.java:314)
> at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:417)
> at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367)
> at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679)
> at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514)
> at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511)
> 



Re: Confluent Schema Registry DeserializationSchema

2018-03-21 Thread Piotr Nowojski
Hi,

It looks like to me that kafka.utils.VerifiableProperties comes  from 
org.apache.kafka:kafka package - please check and solve (if possible) 
dependency conflicts in your pom.xml regarding this package. Probably there is 
some version collision.

Piotrek

> On 21 Mar 2018, at 16:40, dim5b  wrote:
> 
> I trying to connect to schema registry and deserialize the project. 
> 
> I am building my project and on mvn build i get the  error
> 
> class file for kafka.utils.VerifiableProperties not found...
> 
> 
> import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
> import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
> import io.confluent.kafka.serializers.KafkaAvroDecoder;
> import org.apache.flink.api.common.serialization.DeserializationSchema;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.typeutils.TypeExtractor;
> 
> 
> public class ConfluentAvroDeserializationSchema implements
> DeserializationSchema {
> 
>private final String schemaRegistryUrl;
>private final int identityMapCapacity;
>private KafkaAvroDecoder kafkaAvroDecoder;
> 
>public ConfluentAvroDeserializationSchema(String schemaRegistyUrl) {
>this(schemaRegistyUrl, 1000);
>}
> 
>public ConfluentAvroDeserializationSchema(String schemaRegistryUrl, int
> identityMapCapacity) {
>this.schemaRegistryUrl = schemaRegistryUrl;
>this.identityMapCapacity = identityMapCapacity;
>}
> 
>@Override
>public CelloAvro deserialize(byte[] bytes) throws IOException {
>if (kafkaAvroDecoder == null) {
>SchemaRegistryClient schemaRegistry = new
> CachedSchemaRegistryClient(this.schemaRegistryUrl,
> this.identityMapCapacity);
>this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry);
>}
>return (CelloAvro) this.kafkaAvroDecoder.fromBytes(bytes);
>}
> 
>@Override
>public boolean isEndOfStream(CelloAvro celloAvro) {
>return false;
>}
> 
>@Override
>public TypeInformation getProducedType() {
>return TypeExtractor.getForClass(CelloAvro.class);
>}
> }
> 
> My dependencies are:
> 
> 
>   org.apache.flink
>   flink-avro
>   ${flink.version}
>   
> 
>   
>   io.confluent
>   kafka-avro-serializer
>   4.0.0
>   
> 
> 
> Could someone please help I see there is an open issue for an end to end
> test with  Confluent's Schema Registry
> 
> https://issues.apache.org/jira/browse/FLINK-8970
> 
> 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Error running on Hadoop 2.7

2018-03-21 Thread Piotr Nowojski
Hi,

> Does some simple word count example works on the cluster after the upgrade?

If not, maybe your job is pulling some dependency that’s causing this version 
conflict?

Piotrek

> On 21 Mar 2018, at 16:52, ashish pok  wrote:
> 
> Hi Piotrek,
> 
> Yes, this is a brand new Prod environment. 2.6 was in our lab.
> 
> Thanks,
> 
> -- Ashish
> 
> On Wed, Mar 21, 2018 at 11:39 AM, Piotr Nowojski
>  wrote:
> Hi,
> 
> Have you replaced all of your old Flink binaries with freshly downloaded 
> <https://flink.apache.org/downloads.html> Hadoop 2.7 versions? Are you sure 
> that something hasn't mix in the process?
> 
> Does some simple word count example works on the cluster after the upgrade?
> 
> Piotrek
> 
>> On 21 Mar 2018, at 16:11, ashish pok > <mailto:ashish...@yahoo.com>> wrote:
>> 
>> Hi All,
>> 
>> We ran into a roadblock in our new Hadoop environment, migrating from 2.6 to 
>> 2.7. It was supposed to be an easy lift to get a YARN session but doesnt 
>> seem like :) We definitely are using 2.7 binaries but it looks like there is 
>> a call here to a private methos which screams runtime incompatibility. 
>> 
>> Anyone has seen this and have pointers?
>> 
>> Thanks, Ashish
>> Exception in thread "main" java.lang.IllegalAccessError: tried to access 
>> method 
>> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object;
>>  from class 
>> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider <>
>> at 
>> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:75)
>> at 
>> org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:163)
>> at 
>> org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94)
>> at 
>> org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)
>> at 
>> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceStart(YarnClientImpl.java:187)
>> at 
>> org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
>> at 
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.getYarnClient(AbstractYarnClusterDescriptor.java:314)
>> at 
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:417)
>> at 
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:367)
>> at 
>> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:679)
>> at 
>> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:514)
>> at 
>> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:511)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at 
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>> at 
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at 
>> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:511)
>> 
> 



Re: Checkpoints very slow with high backpressure

2018-04-05 Thread Piotr Nowojski
Hi,

If I’m not mistaken this is a known issue, that we were working to resolve for 
Flink 1.5 release. The problem is that with back pressure, data are being 
buffered between nodes and on checkpoint, all of those data must be processed 
before checkpoint can be completed. This is especially problematic if 
processing a single record takes/can take significant amount of time. 

With Flink 1.5 we introduced mechanism to better control the amount of buffered 
data and it should address this issue (Flink 1.5 should be released within 
couple of weeks).

In the mean time, you could try out Flink 1.5 release candidate that has been 
just published or you could try to reduce the number of configured network 
buffers, however keep in mind that at some point this can decrease your maximal 
throughput:

https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#configuring-the-network-buffers
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#configuring-the-network-buffers>

On the other hand, why does it prevents you from using a checkpointing at all? 

Piotr Nowojski 

> On 5 Apr 2018, at 06:10, Edward  wrote:
> 
> I read through this thread and didn't see any resolution to the slow
> checkpoint issue (just that someone resolved their backpressure issue).
> 
> We are experiencing the same problem: 
> - When there is no backpressure, checkpoints take less than 100ms
> - When there is high backpressure, checkpoints take anywhere from 5 minutes
> to 25 minutes.
> 
> This is preventing us from using the checkpointing feature at all, since
> periodic backpressure is unavoidable.
> 
> We are experiencing this when running on Flink 1.4.0.
> We are retaining only a single checkpoint, and the size of retained
> checkpoint is less than 250KB, so there's not a lot of state.
>   state.backend: jobmanager
>   state.backend.async: true
>   state.backend.fs.checkpointdir: hdfs://checkpoints
>   state.checkpoints.num-retained: 1
>   max concurrent checkpoints: 1
>   checkpointing mode: AT_LEAST_ONCE
> 
> One other data point: if I rewrite the job to allow chaining all steps (i.e.
> same parallelism on all steps, so they fit in 1 task slot), the checkpoints
> are still slow under backpressure, but are an order of magnitude faster --
> they take about 60 seconds rather than 15 minutes.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Restart strategy defined in flink-conf.yaml is ignored

2018-04-05 Thread Piotr Nowojski
Hi,

Can you provide more details, like post your configuration/log files/screen 
shots from web UI and Flink version being used?

Piotrek

> On 5 Apr 2018, at 06:07, Alexander Smirnov  
> wrote:
> 
> Hello,
> 
> I've defined restart strategy in flink-conf.yaml as none. WebUI / Job Manager 
> section confirms that.
> But looks like this setting is disregarded.
> 
> When I go into job's configuration in the WebUI, in the Execution 
> Configuration section I can see:
> Max. number of execution retries  Restart with fixed delay (1 
> ms). #2147483647 restart attempts.
> 
> Do you think it is a bug?
> 
> Alex



Re: Checkpoints very slow with high backpressure

2018-04-05 Thread Piotr Nowojski
Thanks for the explanation.

I hope that either 1.5 will solve your issue (please let us know if it 
doesn’t!) or if you can’t wait, that decreasing memory buffers can mitigate the 
problem.

Piotrek

> On 5 Apr 2018, at 08:13, Edward  wrote:
> 
> Thanks for the update Piotr.
> 
> The reason it prevents us from using checkpoints is this:
> We are relying on the checkpoints to trigger commit of Kafka offsets for our
> source (kafka consumers).
> When there is no backpressure this works fine. When there is backpressure,
> checkpoints fail because they take too long, and our Kafka offsets are never
> committed to Kafka brokers (as we just learned the hard way).
> 
> Normally there is no backpressure in our jobs, but when there is some
> outage, then the jobs do experience 
> backpressure when catching up. And when you're already trying to recover
> from an incident, that is not the ideal time for kafka offsets commits to
> stop working.
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Restart strategy defined in flink-conf.yaml is ignored

2018-04-05 Thread Piotr Nowojski
18-04-05 22:37:29,545 INFO  
> org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
>   - Trying to associate with JobManager leader 
> akka.tcp://flink@localhost:6123/user/jobmanager
> 2018-04-05 22:37:29,552 INFO  
> org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
>   - Resource Manager associating with leading JobManager 
> Actor[akka://flink/user/jobmanager#-853250886] - leader session 
> ----
> 2018-04-05 22:37:30,495 INFO  
> org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
>   - TaskManager f0b0370186ab3c865db63fe60ca68e08 has started.
> 2018-04-05 22:37:30,497 INFO  
> org.apache.flink.runtime.instance.InstanceManager - Registered 
> TaskManager at 192.168.0.26 
> (akka.tcp://flink@mb-sr-asmirnov.local:60696/user/taskmanager) as 
> 2972a72a7223e63bb5a4fedd159c0b78. Current number of registered hosts is 1. 
> Current number of alive task slots is 1.
> 2018-04-05 22:38:29,355 INFO  org.apache.flink.runtime.client.JobClient   
>   - Checking and uploading JAR files
> 2018-04-05 22:38:29,639 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Submitting job 43ecfe9cb258b7f624aad9868d306edb (Failed job).
> 2018-04-05 22:38:29,643 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Using restart strategy 
> FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, 
> delayBetweenRestartAttempts=1) for 43ecfe9cb258b7f624aad9868d306edb.
> 2018-04-05 22:38:29,656 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job recovers 
> via failover strategy: full graph restart
> 
> 
> 
> On Thu, Apr 5, 2018 at 10:35 PM Alexander Smirnov 
> mailto:alexander.smirn...@gmail.com>> wrote:
> Hi Piotr,
> 
> I'm using Flink 1.4.2
> 
> it's a standard flink distribution downloaded and unpacked.
> 
> added the following lines to conf/flink-conf.yaml:
> restart-strategy: none
> state.backend: rocksdb
> state.backend.fs.checkpointdir: 
> file:///tmp/nfsrecovery/flink-checkpoints-metadata
> state.backend.rocksdb.checkpointdir: 
> file:///tmp/nfsrecovery/flink-checkpoints-rocksdb
> 
> created new java project as described at 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html>
> 
> here's the code:
> 
> public class FailedJob
> {
> static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
> 
> public static void main( String[] args ) throws Exception
> {
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
> 
> env.enableCheckpointing(5000,
> CheckpointingMode.EXACTLY_ONCE);
> 
> DataStream stream = env.fromCollection(Arrays.asList("test"));
> 
> stream.map(new MapFunction(){
> @Override
> public String map(String obj) {
> throw new NullPointerException("NPE");
> } 
> });
> 
> env.execute("Failed job");
> }
> }
> 
> attaching screenshots, please let me know if more info is needed
> 
> Alex
> 
> 
>  
> 
> On Thu, Apr 5, 2018 at 5:35 PM Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Can you provide more details, like post your configuration/log files/screen 
> shots from web UI and Flink version being used?
> 
> Piotrek
> 
> > On 5 Apr 2018, at 06:07, Alexander Smirnov  > <mailto:alexander.smirn...@gmail.com>> wrote:
> >
> > Hello,
> >
> > I've defined restart strategy in flink-conf.yaml as none. WebUI / Job 
> > Manager section confirms that.
> > But looks like this setting is disregarded.
> >
> > When I go into job's configuration in the WebUI, in the Execution 
> > Configuration section I can see:
> > Max. number of execution retries  Restart with fixed delay 
> > (1 ms). #2147483647  restart attempts.
> >
> > Do you think it is a bug?
> >
> > Alex
> 



Re: Restart strategy defined in flink-conf.yaml is ignored

2018-04-06 Thread Piotr Nowojski
Thanks!

> On 6 Apr 2018, at 00:30, Alexander Smirnov  
> wrote:
> 
> Thanks Piotr,
> 
> I've created a JIRA issue to track it: 
> https://issues.apache.org/jira/browse/FLINK-9143 
> <https://issues.apache.org/jira/browse/FLINK-9143>
> 
> Alex
> 
> 
> On Thu, Apr 5, 2018 at 11:28 PM Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Thanks for the details! I can confirm this behaviour. flink-conf.yaml 
> restart-strategy value is being completely ignored (regardless of it’s value) 
> when user enables checkpointing:
> 
> env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
> 
> I suspect this is a bug, but I have to confirm it.
> 
> Thanks, Piotrek
> 
>> On 5 Apr 2018, at 12:40, Alexander Smirnov > <mailto:alexander.smirn...@gmail.com>> wrote:
>> 
>> jobmanager.log:
>> 
>> 2018-04-05 22:37:28,348 INFO  
>> org.apache.flink.configuration.GlobalConfiguration- Loading 
>> configuration property: restart-strategy, none
>> 2018-04-05 22:37:28,353 INFO  org.apache.flink.core.fs.FileSystem
>>- Hadoop is not in the classpath/dependencies. The extended 
>> set of supported File Systems via Hadoop is not available.
>> 2018-04-05 22:37:28,506 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>>- Starting JobManager without high-availability
>> 2018-04-05 22:37:28,510 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>>- Starting JobManager on localhost:6123 with execution mode 
>> CLUSTER
>> 2018-04-05 22:37:28,517 INFO  
>> org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot 
>> create Hadoop Security Module because Hadoop cannot be found in the 
>> Classpath.
>> 2018-04-05 22:37:28,546 INFO  
>> org.apache.flink.runtime.security.SecurityUtils   - Cannot 
>> install HadoopSecurityContext because Hadoop cannot be found in the 
>> Classpath.
>> 2018-04-05 22:37:28,591 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>>- Trying to start actor system at localhost:6123
>> 2018-04-05 22:37:28,981 INFO  akka.event.slf4j.Slf4jLogger   
>>- Slf4jLogger started
>> 2018-04-05 22:37:29,027 INFO  akka.remote.Remoting   
>>- Starting remoting
>> 2018-04-05 22:37:29,129 INFO  akka.remote.Remoting   
>>- Remoting started; listening on addresses 
>> :[akka.tcp://flink@localhost:6123 <>]
>> 2018-04-05 22:37:29,135 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>>- Actor system started at akka.tcp://flink@localhost:6123 <>
>> 2018-04-05 22:37:29,148 INFO  
>> org.apache.flink.runtime.metrics.MetricRegistryImpl   - No metrics 
>> reporter configured, no metrics will be exposed/reported.
>> 2018-04-05 22:37:29,152 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>>- Starting JobManager web frontend
>> 2018-04-05 22:37:29,161 INFO  
>> org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Determined 
>> location of JobManager log file: 
>> /Users/asmirnov/flink-1.4.2/log/flink-jobmanager-0.log
>> 2018-04-05 22:37:29,161 INFO  
>> org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Determined 
>> location of JobManager stdout file: 
>> /Users/asmirnov/flink-1.4.2/log/flink-jobmanager-0.out
>> 2018-04-05 22:37:29,162 INFO  
>> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Using 
>> directory 
>> /var/folders/5s/yj6g5wd90h158whcb_483hhhq7t4sw/T/flink-web-901a3fb7-d366-4f90-b75c-1e1f8038ed37
>>  for the web interface files
>> 2018-04-05 22:37:29,162 INFO  
>> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Created 
>> directory 
>> /var/folders/5s/yj6g5wd90h158whcb_483hhhq7t4sw/T/flink-web-21e5d8a8-7967-40f0-97d7-a803d9bd5913
>>  for web frontend JAR file uploads.
>> 2018-04-05 22:37:29,447 INFO  
>> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Web frontend 
>> listening at localhost:8081
>> 2018-04-05 22:37:29,447 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>>- Starting JobManager actor
>> 2018-04-05 22:37:29,452 INFO  org.apache.flink.runtime.blob.BlobServer   
>>- Created BLOB server storage directory 
>> /var/folders/5s/yj6g5wd90h158whcb_483hhhq7t4sw/T/blobStore-6777e862-0c2c-4679-a42f-b1921baa5236
>> 2018-04-05 22:37:29,453 INFO  org.apache.flink.runtime.blob.BlobServ

Re: Insert data into Cassandra without Flink Cassandra connection

2018-05-02 Thread Piotr Nowojski
Hi,

The only way that I can think of is if you keep your flatMap operator with 
parallelism 1, but that might defeat the purpose. Otherwise there is no way to 
open one single connection and share it across multiple TaskManagers (which can 
be running on different physical machines). Please rethink your 
solution/approach with respect to distributed nature of Flink.

However there are some valid use cases where one would like to have some part 
of his job graph distributed and some part(s) non distributed - like issuing 
one single commit after a distributed write, or processing a data in parallel 
but writing them to a relational database like MySQL via one single Sink 
operator.. 

Piotrek

> On 26 Apr 2018, at 15:23, Soheil Pourbafrani  wrote:
> 
> Here is my code 
> 
> stream.flatMap(new FlatMapFunction() {
> 
> @Override
> public void flatMap(byte[] value, Collector out) throws Exception {
> Parser.setInsert(true);
> CassandraConnection.connect();
> Parser.setInsert(true);
> System.out.println("\n*** New Message ***\n");
> System.out.println("Row Number : " + i ++ );
> System.out.println("Message: " + HexUtiles.bytesToHex(value));
> Parser.parse(ByteBuffer.wrap(value), ConfigHashMap);
> }
> });
> 
> 
> On Thu, Apr 26, 2018 at 5:22 PM, Soheil Pourbafrani  > wrote:
> I want to use Cassandra native connection (Not Flink Cassandra connection) to 
> insert some data into Cassandra. According to the design of the code, the 
> connection to Cassandra will open once at the start and all taskmanager use 
> it to write data.  It's ok running in local mode.
> 
> The problem is when I submit the code on YARN cluster, as each taskmanager 
> has it's own JVM, the connection to the Cassandra will not share and I should 
> open and close it for each taskmanager. Is there any way to have a connection 
> for all taskmanagers?
> 



Re: Odd job failure

2018-05-02 Thread Piotr Nowojski
Hi,

It might be some Kafka issue. 

From what you described your reasoning seems sound. For some reason TM3 fails 
and is unable to restart and process any data, thus forcing spilling on 
checkpoint barriers on TM1 and TM2.

I don’t know the reason behind java.lang.NoClassDefFoundError: 
org/apache/kafka/clients/NetworkClient$1 errors, but it doesn’t seem to be 
important in this case.

1. What Kafka version are you using? Have you looked for any known Kafka issues 
with those symptoms?
2. Maybe the easiest thing will be to reformat/reinstall/recreate TM3 AWS 
image? It might be some system issue.

Piotrek

> On 28 Apr 2018, at 01:54, Elias Levy  wrote:
> 
> We had a job on a Flink 1.4.2 cluster with three TMs experience an odd 
> failure the other day.  It seems that it started as some sort of network 
> event.  
> 
> It began with the 3rd TM starting to warn every 30 seconds about socket 
> timeouts while sending metrics to DataDog.  This latest for the whole outage.
> 
> Twelve minutes later, all TMs reported at nearly the same time that they had 
> marked the Kafka coordinator as deed ("Marking the coordinator XXX (id: 
> 2147482640 rack: null) dead for group ZZZ").  The job terminated and the 
> system attempted to recover it.  Then things got into a weird state.
> 
> The following related for six or seven times for a period of about 40 
> minutes: 
> TM attempts to restart the job, but only the first and second TMs show signs 
> of doing so.  
> The disk begins to fill up on TMs 1 and 2.  
> TMs 1 & 2 both report java.lang.NoClassDefFoundError: 
> org/apache/kafka/clients/NetworkClient$1 errors.  These were mentioned on 
> this list earlier this month.  It is unclear if the are benign.
> The job dies when the disks finally fills up on 1 and 2.
> 
> Looking at the backtrace logged when the disk fills up, I gather that Flink 
> is buffering data coming from Kafka into one of my operators as a result of a 
> barrier.  The job has a two input operator, with one input the primary data, 
> and a secondary input for control commands.  It would appear that for 
> whatever reason the barrier for the control stream is not making it to the 
> operator, thus leading to the buffering and full disks.  Maybe Flink 
> scheduled the operator source of the control stream on the 3rd TM which seems 
> like it was not scheduling tasks?
> 
> Finally the JM records that it 13 late messages for already expired 
> checkpoints (could they be from the 3rd TM?), the job is restored one more 
> time and it works.  All TMs report nearly at the same time that they can now 
> find the Kafka coordinator.
> 
> 
> Seems like the 3rd TM has some connectivity issue, but then all TMs seems to 
> have a problem communicating with the Kafka coordinator at the same time and 
> recovered at the same time.  The TMs are hosted in AWS across AZs, so all of 
> them having connectivity issues at the same time is suspect.  The Kafka node 
> in question was up and other clients in our infrastructure seems to be able 
> to communicate with it without trouble.  Also, the Flink job itself seemed to 
> be talking to the Kafka cluster while restarting as it was spilling data to 
> disk coming from Kafka.  And the JM did not report any reduction on available 
> task slots, which would indicate connectivity issues between the JM and the 
> 3rd TM.  Yet, the logs in the 3rd TM do not show any record of trying to 
> restore the job during the intermediate attempts.
> 
> What do folks make of it?
> 
> 
> And a question for Flink devs, is there some reason why Flink does not stop 
> spilling messages to disk when the disk is going to fill up?  Seems like 
> there should be a configurable limit to how much data can be spilled before 
> back-pressure is applied to slow down or stop the source.



Re: Apache Flink - Flink Forward SF 2018 - Scaling stream data pipelines (source code)

2018-05-02 Thread Piotr Nowojski
Hi,

Till, do have this code somewhere?

M Singh: Till is out of the office and will be back on next week, so he will 
probably not be able to respond for couple of days.

Piotrek

> On 30 Apr 2018, at 13:51, M Singh  wrote:
> 
> Hi:
> 
> I was looking at the flink-forward sf 2018 presentations and wanted to find 
> out if there is a git repo where I can find the code for "Scaling stream data 
> pipelines" by Till Rohrmann & Flavio Junqueira ?
> 
> Thanks
> 



Re: Fat jar fails deployment (streaming job too large)

2018-05-02 Thread Piotr Nowojski
Short answer: could be that your job is simply too big to be serialised, 
distributed and deserialised in the given time and you would have to increase 
timeouts even more.

Long answer: 

Do you have the same problem when you try to submit smaller job? Does your 
cluster work for simpler jobs? Try cutting down/simplifying your job up to the 
point it works. Maybe you will be able to pin down one single operator that’s 
causing the problem (one that have for example huge static data structure). If 
so, you might be able to optimise your operators in some way. Maybe some 
operator is doing some weird things and causing problems.

You could also try to approach this problem from other direction (as previously 
suggested by Fabian). Try to profile/find out what the cluster is doing, where 
is the problem. Job Manager? One Task Manager? All of the Task Managers? Is 
there high cpu usage somewhere? Maybe one thread somewhere is overloaded? High 
network usage? After identifying potential problematic JVM’s, you could attach 
a code profiler or print stack traces to further pin down the problem. 

Piotrek

> On 30 Apr 2018, at 21:53, Chan, Regina  wrote:
> 
> Any updates on this one? I'm seeing similar issues with 1.3.3 and the batch 
> api. 
> 
> Main difference is that I have even more operators ~850, mostly maps and 
> filters with one cogroup. I don't really want to set a akka.client.timeout 
> for anything more than 10 minutes seeing that it still fails with that 
> amount. The akka.framesize is already 500Mb... 
> 
> akka.framesize: 524288000b
> akka.ask.timeout: 10min
> akka.client.timeout: 10min
> akka.lookup.timeout: 10min
> 
> 
> Thanks,
> Regina
> 
> 
> 
> -Original Message-
> From: Niels [mailto:nielsdenis...@gmail.com ] 
> Sent: Tuesday, February 27, 2018 11:40 AM
> To: user@flink.apache.org 
> Subject: Re: Fat jar fails deployment (streaming job too large)
> 
> Hi Till,
> 
> I've just tried to set on the *client*:
> akka.client.timeout: 300s 
> 
> On the *cluster*:
> akka.ask.timeout: 30s
> akka.lookup.timeout: 30s
> akka.client.timeout: 300s
> akka.framesize: 104857600b #(10x the original of 10MB)
> akka.log.lifecycle.events: true
> 
> Still gives me the same issue, the fat jar isn't deployed. See the attached
> files for the logs of the jobmanager and the deployer. Let me know if I can
> provide you with any additional info. Thanks for your help!
> 
> Cheers,
> Niels
> 
> Flink_deploy_log.txt
>   
> 
>  >  
> flink_jobmanager_log.txt
>   
> 
>  >  
> 
> 
> 
> 
> 
> --
> Sent from: 
> https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_&d=DwICAg&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=vus_2CMQfE0wKmJ4Q_gOWWsBmKlgzMeEwtqShIeKvak&m=p4nMsVlOWZXkIxtRMVt11ovf0gctuHFZJfzvDgpvyKk&s=yX4z6UV1AFsAQtJsVquzujhFio0CgYr-tAIoroUXP8E&e=
>  
> 


Re: ConnectedIterativeStreams and processing state 1.4.2

2018-05-02 Thread Piotr Nowojski
Hi,

Why can not you use simple CoProcessFunction and handle cache updates within 
it’s processElement1 or processElement2 method?

Piotrek

> On 1 May 2018, at 10:20, Lasse Nedergaard  wrote:
> 
> Hi.
> 
> I have a case where I have a input stream that I want to enrich with external 
> data. I want to cache some of the external lookup data to improve the overall 
> performances.
> To update my cache (a CoProcessFunction) I would use iteration to send the 
> external enriched information back to the cache and update a mapstate. I use 
> CoProcesFunction as the input stream and the enrich stream contains 2 
> diff.object types and I don't want to mix them. 
> Because I use a ConnectedIterativeStream I can't use state in my 
> CoProcessFunction because the ConnectedIterativeStream create a DataStream 
> based on the Feedback signature and not the stream I close the iteration with 
> and it is not possible to provide a keySelector in the withFeedbackType
> 
> Form Flink source
> public ConnectedIterativeStreams(DataStream input, TypeInformation 
> feedbackType, long waitTime) {
> super(input.getExecutionEnvironment(), input, new 
> DataStream(input.getExecutionEnvironment(), new 
> CoFeedbackTransformation(input.getParallelism(), feedbackType, waitTime)));
> }
> and both streams need to be keyed before state are assigned to the operator.
> Any ideas how to workaround this problem?
> 
> My sudo code is as below.
> 
> IterativeStream.ConnectedIterativeStreams iteration 
> = inputStream
> .keyBy(obj -> obj.getkey))
> 
> .iterate(maxWaitForIterations).withFeedbackType(TypeInformation.of(new 
> TypeHint() {}));
> 
> DataStream enrichedStream = iteration
> .process(new EnrichFromState());
> 
> DataStream notEnrichedOutput = enrichedStream
> .filter(obj -> obj.enriched);
> 
> EnrichService EnrichService = new EnrichService();
> DataStream enrichedFromApi = 
> EnrichService.parse(notEnrichedOutput);
> 
> DataStream newEnrich = enrichedFromApi
> .map(obj -> {
> 
> EnrichData newData =  new EnrichData();
> newData.xx = obj.xx();
> 
> return newData;
> })
> .keyBy(obj -> obj.getkey);
> 
> 
> iteration.closeWith(newAddresses);
> 



Re: KafkaProducer with generic (Avro) serialization schema

2018-05-02 Thread Piotr Nowojski
Hi,

My Scala knowledge is very limited (and my Scala's serialization knowledge is 
non existent), but one way or another you have to make your SerializationSchema 
serialisable. If indeed this is the problem, maybe a better place to ask this 
question is on Stack Overflow or some scala specific mailing list/board (unless 
someone else from the Flink's community can provide an answer to this problem)? 

Piotrek

> On 1 May 2018, at 16:30, Wouter Zorgdrager  wrote:
> 
> So, I'm still struggling with this issue. I dived a bit more into the problem 
> and I'm pretty sure that the problem is that I have to (implicitly) pass the 
> SchemaFor and RecordTo classes to my serialization schema (otherwise I can't 
> make it generic). However those class aren't serializable, but of course I 
> can't annotate them transient nor make it a lazy val which gives me the 
> current issue. 
> 
> I hope someone has some leads for me. 
> 
> Thanks!
> 
> Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager  >:
> Hi Bill,
> 
> Thanks for your answer. However this proposal isn't going to solve my issue, 
> since the problem here is that the context bounds I need to give in order to 
> serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't serializable 
> classes. This results in Flink not being able to serialize the KafkaProducer 
> failing the whole job. 
> 
> Thanks,
> Wouter
> 
> Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill  >:
> The things I would try would first in you are you class Person and Address 
> have getters and setters and a no argument constructor.
> 
>  
> 
> From: Wouter Zorgdrager [mailto:zorgdrag...@gmail.com 
> ] 
> Sent: Wednesday, April 25, 2018 7:17 AM
> To: user@flink.apache.org 
> Subject: KafkaProducer with generic (Avro) serialization schema
> 
>  
> 
> Dear reader,
> 
>  
> 
> I'm currently working on writing a KafkaProducer which is able to serialize a 
> generic type using avro4s.
> 
> However this serialization schema is not serializable itself. Here is my code 
> for this:
> 
>  
> 
> The serialization schema:
> 
> class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] extends 
> SerializationSchema[IN] {
> 
>  
> 
>   override def serialize(element: IN): Array[Byte] = {
> 
> val byteArray = new ByteArrayOutputStream()
> 
> val avroSer = AvroOutputStream.binary[IN](byteArray)
> 
> avroSer.write(element)
> 
> avroSer.flush()
> 
> avroSer.close()
> 
>  
> 
> return byteArray.toByteArray
> 
>   }
> 
> }
> 
>  
> 
> The job code:
> 
> case class Person(name : String, age : Int, address : Address)
> 
> case class Address(city : String, street : String)
> 
>  
> 
> class SimpleJob {
> 
>  
> 
>   @transient
> 
>   private lazy val serSchema : AvroSerializationSchema[Person] = new 
> AvroSerializationSchema[Person]()
> 
>  
> 
>   def start() = {
> 
> val testPerson = Person("Test", 100, Address("Test", "Test"))
> 
>  
> 
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> 
>  
> 
> env.
> 
>   fromCollection(Seq(testPerson)).
> 
>   addSink(createKafkaSink())
> 
>  
> 
> env.execute("Flink sample job")
> 
>   }
> 
>  
> 
>  
> 
>   def createKafkaSink() : RichSinkFunction[Person] = {
> 
> //set some properties
> 
> val properties = new Properties()
> 
> properties.put("bootstrap.servers", "127.0.0.01:9092 
> ")
> 
> properties.put("zookeeper.connect", "127.0.0.1:2181 
> ")
> 
>  
> 
> new FlinkKafkaProducer011[Person]("persons", serSchema, properties)
> 
>   }
> 
>  
> 
> }
> 
>  
> 
> The code does compile, however it gives the following error on runtime: 
> InvalidProgramException: 
> Objectorg.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d
>  
> 
>  is not serializable.
> 
>  
> 
> I assume this means that my custom SerializationSchema is not serializable 
> due to the use of SchemaFor, FromRecord and ToRecord. 
> 
> Anyone knows a solution or workaround?
> 
>  
> 
> Thanks in advance!
> 
> Wouter
> 
> This message contains confidential information and is intended only for the 
> individual named. If you are not the named addressee, you should not 
> disseminate, distribute, alter or copy this e-mail. Please notify the sender 
> immediately by e-mail if you hav

Re: intentional back-pressure (or a poor man's side-input)

2018-05-03 Thread Piotr Nowojski
Maybe it could work with Flink’s 1.5 credit base flow control. But you would 
need a way to express state “block one input side of the CoProcessFunction”, 
pass this information up to the input gate and handle it probably similar to 
how `org.apache.flink.streaming.runtime.io.CachedBufferBlocker` blocks inputs 
in case of checkpoint barrier. You can not just block inside `processElement1` 
method.

However I haven’t thought it through and maybe there could be some issues 
regarding checkpointing (what should happen to checkpoint barriers if you are 
blocking one side of the input? Should this block checkpoint barrier as well? 
Should you cancel checkpoint?).

Piotrek

> On 2 May 2018, at 16:31, Derek VerLee  wrote:
> 
> 
> I was just thinking about about letting a coprocessfunction "block" or cause 
> back pressure on one of it's streams?
> Has this been discussed as an option?
> Does anyone know a way to effectively accomplish this?
> 
> I think I could get a lot of mileage out of something like that without 
> needing a full implementation of FLIP-17 (which I would eagerly await still). 
> 
> As mentioned on another thread, one could use a liststate to buffer the main 
> input until the "side input" was sufficiently processed.  However the 
> downside of this is that I have no way to control the size of those buffers, 
> whereas with backpressure, the system will naturally take care of it.



Re: intentional back-pressure (or a poor man's side-input)

2018-05-04 Thread Piotr Nowojski
> running a batch or "bounded stream" job first to generate a "cache state", 
> and then launching the main streaming job, which loads this initial state 
> load in open()... not sure how to work out the keying.
> 

This is the recommended workaround this issue - first start a job to precompute 
some values for an initial state and then pass those values to the main job as 
(for example) a startup argument. I think for now it’s the cleanest and the 
easiest to maintain solution. If initial state is too large, you could imagine 
saving it on a DFS and loading it in initialise phase of the main job.

Piotrek

> On 3 May 2018, at 19:03, Derek VerLee  wrote:
> 
> Thanks for the thoughts Piotr.
> 
> Seems I have a talent for asking (nearly) the same question as someone else 
> at the same time, and the check-pointing was raised in that thread as well.
> 
> I guess one way to conceptualize it is that you have is a stream job that has 
> "phases" and transitions between those phases.  Maybe there would be a new 
> type of barrier to indicate a change between phases?  But now I'm way outside 
> the bounds of hoping to have a "quick and dirty" version of a proper side 
> input implementation.
> 
> I'm chewing on two new ideas now:  Using a "union" stream instead of two 
> streams, and custom source backed by two different sources under the hood, so 
> the "state machine" logic transitioning from initialization to normal 
> operation all happen in the same operator.  Or, running a batch or "bounded 
> stream" job first to generate a "cache state", and then launching the main 
> streaming job, which loads this initial state load in open()... not sure how 
> to work out the keying.
> 
> I'll post back if I get anywhere with these ideas.
> 
> On 5/3/18 10:49 AM, Piotr Nowojski wrote:
>> Maybe it could work with Flink’s 1.5 credit base flow control. But you would 
>> need a way to express state “block one input side of the CoProcessFunction”, 
>> pass this information up to the input gate and handle it probably similar to 
>> how `org.apache.flink.streaming.runtime.io.CachedBufferBlocker` blocks 
>> inputs in case of checkpoint barrier. You can not just block inside 
>> `processElement1` method.
>> 
>> However I haven’t thought it through and maybe there could be some issues 
>> regarding checkpointing (what should happen to checkpoint barriers if you 
>> are blocking one side of the input? Should this block checkpoint barrier as 
>> well? Should you cancel checkpoint?).
>> 
>> Piotrek
>> 
>>> On 2 May 2018, at 16:31, Derek VerLee  
>>> <mailto:derekver...@gmail.com> wrote:
>>> 
>>> 
>>> I was just thinking about about letting a coprocessfunction "block" or 
>>> cause back pressure on one of it's streams?
>>> Has this been discussed as an option?
>>> Does anyone know a way to effectively accomplish this?
>>> 
>>> I think I could get a lot of mileage out of something like that without 
>>> needing a full implementation of FLIP-17 (which I would eagerly await 
>>> still). 
>>> 
>>> As mentioned on another thread, one could use a liststate to buffer the 
>>> main input until the "side input" was sufficiently processed.  However the 
>>> downside of this is that I have no way to control the size of those 
>>> buffers, whereas with backpressure, the system will naturally take care of 
>>> it.
> 



Re: PartitionNotFoundException after deployment

2018-05-04 Thread Piotr Nowojski
Ufuk: I don’t know why.

+1 for your other suggestions.

Piotrek

> On 4 May 2018, at 14:52, Ufuk Celebi  wrote:
> 
> Hey Gyula!
> 
> I'm including Piotr and Nico (cc'd) who have worked on the network
> stack in the last releases.
> 
> Registering the network structures including the intermediate results
> actually happens **before** any state is restored. I'm not sure why
> this reproducibly happens when you restore state. @Nico, Piotr: any
> ideas here?
> 
> In general I think what happens here is the following:
> - a task requests the result of a local upstream producer, but that
> one has not registered its intermediate result yet
> - this should result in a retry of the request with some backoff
> (controlled via the config params you mention
> taskmanager.network.request-backoff.max,
> taskmanager.network.request-backoff.initial)
> 
> As a first step I would set logging to DEBUG and check the TM logs for
> messages like "Retriggering partition request {}:{}."
> 
> You can also check the SingleInputGate code which has the logic for
> retriggering requests.
> 
> – Ufuk
> 
> 
> On Fri, May 4, 2018 at 10:27 AM, Gyula Fóra  wrote:
>> Hi Ufuk,
>> 
>> Do you have any quick idea what could cause this problems in flink 1.4.2?
>> Seems like one operator takes too long to deploy and downstream tasks error
>> out on partition not found. This only seems to happen when the job is
>> restored from state and in fact that operator has some keyed and operator
>> state as well.
>> 
>> Deploying the same job from empty state works well. We tried increasing the
>> taskmanager.network.request-backoff.max that didnt help.
>> 
>> It would be great if you have some pointers where to look further, I havent
>> seen this happening before.
>> 
>> Thank you!
>> Gyula
>> 
>> The errror:
>> org.apache.flink.runtime.io.network.partition.: Partition
>> 4c5e9cd5dd410331103f51127996068a@b35ef4ffe25e3d17c5d6051ebe2860cd not found.
>>at
>> org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:77)
>>at
>> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:115)
>>at
>> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:159)
>>at java.util.TimerThread.mainLoop(Timer.java:555)
>>at java.util.TimerThread.run(Timer.java:505)
> 
> 
> 
> -- 
> Data Artisans GmbH | Stresemannstr. 121a | 10963 Berlin
> 
> i...@data-artisans.com
> +49-30-43208879
> 
> Registered at Amtsgericht Charlottenburg - HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen



Re: This server is not the leader for that topic-partition

2018-05-04 Thread Piotr Nowojski
Hi,

I think Stefan is right. Quick google search points to this: 
https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition
 


Please let us know if changing your configuration will solve the problem!

Piotrek

> On 4 May 2018, at 15:53, Stefan Richter  wrote:
> 
> Hi,
> 
> I think in general this means that your producer client does not connect to 
> the correct Broker (the leader) but to a broker that is just a follower and 
> the follower can not execute that request. However, I am not sure what causes 
> this in the context of the FlinkKafkaProducer. Maybe Piotr (in CC) has an 
> idea?
> 
> Best,
> Stefan
> 
>> Am 04.05.2018 um 15:45 schrieb Alexander Smirnov 
>> :
>> 
>> Hi,
>> 
>> what could cause the following exception?
>> 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed 
>> to send data to Kafka: This server is not the leader for that 
>> topic-partition.
>>  at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:999)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:614)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93)
>>  at 
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
>>  at 
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>  at 
>> com.alexander.smirnov.FilterBMFunction.flatMap(FilterBMFunction.java:162)
>> 
>> 
>> Thank you,
>> Alex
> 



Re: This server is not the leader for that topic-partition

2018-05-05 Thread Piotr Nowojski
FlinkKafka011Producer uses Kafka 0.11.0.2. 

However I’m not sure if bumping KafkaProducer version solves this issue or 
upgrading Kafka. What Kafka version are you using?

Piotrek

> On 4 May 2018, at 17:55, Alexander Smirnov  
> wrote:
> 
> Thanks for quick turnaround Stefan, Piotr
> 
> This is a rare reproducible issue and I will keep an eye on it
> 
> searching on the Stack Overflow I found 
> https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash
>  
> <https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash>
> 
> They say that the problem is fixed in 0.10.2.1 of kafka producer so I wonder 
> which version is used in FlinkKafkaProducer integration. For earlier versions 
> it is proposed to use configuration:
> 
> final Properties props = new Properties();
> ...
> props.put(ProducerConfig.RETRIES_CONFIG, 10);  
> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
> Integer.toString(Integer.MAX_VALUE));
> props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 2);
> 
> 
> 
> On Fri, May 4, 2018 at 4:58 PM Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> I think Stefan is right. Quick google search points to this: 
> https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition
>  
> <https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition>
> 
> Please let us know if changing your configuration will solve the problem!
> 
> Piotrek
> 
>> On 4 May 2018, at 15:53, Stefan Richter > <mailto:s.rich...@data-artisans.com>> wrote:
>> 
>> Hi,
>> 
>> I think in general this means that your producer client does not connect to 
>> the correct Broker (the leader) but to a broker that is just a follower and 
>> the follower can not execute that request. However, I am not sure what 
>> causes this in the context of the FlinkKafkaProducer. Maybe Piotr (in CC) 
>> has an idea?
>> 
>> Best,
>> Stefan
>> 
>>> Am 04.05.2018 um 15:45 schrieb Alexander Smirnov 
>>> mailto:alexander.smirn...@gmail.com>>:
>>> 
>>> Hi,
>>> 
>>> what could cause the following exception?
>>> 
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed 
>>> to send data to Kafka: This server is not the leader for that 
>>> topic-partition.
>>> at 
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:999)
>>> at 
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:614)
>>> at 
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93)
>>> at 
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219)
>>> at 
>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>>> at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
>>> at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
>>> at 
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>> at 
>>> com.alexander.smirnov.FilterBMFunction.flatMap(FilterBMFunction.java:162)
>>> 
>>> 
>>> Thank you,
>>> Alex
>> 
> 



Re: This server is not the leader for that topic-partition

2018-05-07 Thread Piotr Nowojski
Hi,

Regardless if that will fix the problem or not, please consider upgrading to 
Kafka 0.11.0.2 or 1.0.1. Kafka 0.11.0 release was quite messy and it might be 
that the bug you have hit was fixed in 0.11.0.2.

As a side note, as far as we know our FlinkKafkaProducer011 works fine with 
Kafka 1.0.x.

Piotrek

> On 7 May 2018, at 12:12, Alexander Smirnov  
> wrote:
> 
> Hi Piotr, using 0.11.0 Kafka version
> 
> On Sat, May 5, 2018 at 10:19 AM Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> FlinkKafka011Producer uses Kafka 0.11.0.2. 
> 
> However I’m not sure if bumping KafkaProducer version solves this issue or 
> upgrading Kafka. What Kafka version are you using?
> 
> Piotrek
> 
> 
>> On 4 May 2018, at 17:55, Alexander Smirnov > <mailto:alexander.smirn...@gmail.com>> wrote:
>> 
>> Thanks for quick turnaround Stefan, Piotr
>> 
>> This is a rare reproducible issue and I will keep an eye on it
>> 
>> searching on the Stack Overflow I found 
>> https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash
>>  
>> <https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash>
>> 
>> They say that the problem is fixed in 0.10.2.1 of kafka producer so I wonder 
>> which version is used in FlinkKafkaProducer integration. For earlier 
>> versions it is proposed to use configuration:
>> 
>> final Properties props = new Properties();
>> ...
>> props.put(ProducerConfig.RETRIES_CONFIG, 10);  
>> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
>> Integer.toString(Integer.MAX_VALUE));
>> props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 2);
>> 
>> 
>> 
>> On Fri, May 4, 2018 at 4:58 PM Piotr Nowojski > <mailto:pi...@data-artisans.com>> wrote:
>> Hi,
>> 
>> I think Stefan is right. Quick google search points to this: 
>> https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition
>>  
>> <https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition>
>> 
>> Please let us know if changing your configuration will solve the problem!
>> 
>> Piotrek
>> 
>>> On 4 May 2018, at 15:53, Stefan Richter >> <mailto:s.rich...@data-artisans.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> I think in general this means that your producer client does not connect to 
>>> the correct Broker (the leader) but to a broker that is just a follower and 
>>> the follower can not execute that request. However, I am not sure what 
>>> causes this in the context of the FlinkKafkaProducer. Maybe Piotr (in CC) 
>>> has an idea?
>>> 
>>> Best,
>>> Stefan
>>> 
>>>> Am 04.05.2018 um 15:45 schrieb Alexander Smirnov 
>>>> mailto:alexander.smirn...@gmail.com>>:
>>>> 
>>>> Hi,
>>>> 
>>>> what could cause the following exception?
>>>> 
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed 
>>>> to send data to Kafka: This server is not the leader for that 
>>>> topic-partition.
>>>>at 
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:999)
>>>>at 
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:614)
>>>>at 
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93)
>>>>at 
>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219)
>>>>at 
>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>>at 
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
>>>>at 
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>>>>at 
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>>>>at 
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
>>>>at 
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
>>>>at 
>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>at 
>>>> com.alexander.smirnov.FilterBMFunction.flatMap(FilterBMFunction.java:162)
>>>> 
>>>> 
>>>> Thank you,
>>>> Alex
>>> 
>> 
> 



Re: Late data before window end is even close

2018-05-11 Thread Piotr Nowojski
Hey,

Actually I think Fabian initial message was incorrect. As far as I can see in 
the code of WindowOperator (last lines of 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator#processElement
 ), the element is sent to late side output if it is late AND it wasn’t 
assigned to any of the existing windows (because they were late as well). In 
other words, it should work as you Juho are wishing: element should be marked 
as late once they are overdue/late for the window after one full day.

I have tested it and it works as expected. Following program:

https://gist.github.com/pnowojski/8cd650170925cf35be521cf236f1d97a 


Prints only ONE number to the standard err:

> 1394

And there is nothing on the side output.

Piotrek

> On 11 May 2018, at 12:32, Juho Autio  wrote:
> 
> Thanks. What I still don't get is why my message got filtered in the first 
> place. Even if the allowed lateness filtering would be done "on the window", 
> data should not be dropped as late if it's not in fact late by more than the 
> allowedLateness setting.
> 
> Assuming that these conditions hold:
> - messages (and thus the extracted timestamps) were not out of order by more 
> than 5 secods (as far as I didn't make any mistake in my partition-level 
> analysis)
> - allowedLateness=1 minute
> - watermarks are assigned on kafka consumer meaning that they are 
> synchronized across all partitions
> 
> I don't see how the watermark could have ever been more than 5 seconds 
> further when the message arrives on the isElementLate filter. Do you have any 
> idea on this? Is there some existing test that simulates out of order input 
> to flink's kafka consumer? I could try to build a test case based on that to 
> possibly reproduce my problem. I'm not sure how to gather enough debug 
> information on the production stream so that it would clearly show the 
> watermarks, how they progressed on each kafka partition & later in the chain 
> in case isElementLate filters something.
> 
> On Fri, May 11, 2018 at 12:12 PM, Fabian Hueske  > wrote:
> Hi Juho,
> 
> Thanks for bringing up this topic! I share your intuition. 
> IMO, records should only be filtered out and send to a side output if any of 
> the windows they would be assigned to is closed already.
> 
> I had a look into the code and found that records are filtered out as late 
> based on the following condition:
>  
> protected boolean isElementLate(StreamRecord element){
>return (windowAssigner.isEventTime()) &&
>   (element.getTimestamp() + allowedLateness <= 
> internalTimerService.currentWatermark());
> }
> 
> 
> This code shows that your analysis is correct. 
> Records are filtered out based on their timestamp and the current watermark, 
> even though they arrive before the window is closed.
> 
> OTOH, filtering out records based on the window they would end up in can also 
> be tricky if records are assigned to multiple windows (e.g., sliding windows).
> In this case, a side-outputted records could still be in some windows and not 
> in others. 
> 
> @Aljoscha (CC) Might have an explanation for the current behavior.
> 
> Thanks,
> Fabian
> 
> 
> 2018-05-11 10:55 GMT+02:00 Juho Autio  >:
> I don't understand why I'm getting some data discarded as late on my Flink 
> stream job a long time before the window even closes.
> 
> I can not be 100% sure, but to me it seems like the kafka consumer is 
> basically causing the data to be dropped as "late", not the window. I didn't 
> expect this to ever happen?
> 
> I have a Flink stream job that gathers distinct values using a 24-hour 
> window. It reads the data from Kafka, using a 
> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to synchronize 
> watermarks accross all kafka partitions. The maxOutOfOrderness of the 
> extractor is set to 10 seconds.
> 
> I have also enabled allowedLateness with 1 minute lateness on the 24-hour 
> window:
> 
> .timeWindow(Time.days(1))
> .allowedLateness(Time.minutes(1))
> .sideOutputLateData(lateDataTag)
> .reduce(new DistinctFunction())
> 
> I have used accumulators to see that there is some late data. I have had 
> multiple occurrences of those.
> 
> Now focusing on a particular case that I was investigating more closely. 
> Around ~12:15 o-clock my late data accumulator started showing that 1 message 
> had been late. That's in the middle of the time window – so why would this 
> happen? I would expect late data to be discarded only sometime after 00:01 if 
> some data is arriving late for the window that just closed at 00:00, and 
> doesn't get emitted as part of 1 minute allowedLateness.
> 
> To analyze the timestamps I read all messages in sequence separately from 
> each kafka partition and calculated the difference in timestamps between 
> consecutive messages. I had had exactly one message categorized as late by 
> Flink in this cas

Re: How to broadcast messages to all task manager instances in cluster?

2018-05-11 Thread Piotr Nowojski
Hi,

I don’t quite understand your problem. If you broadcast message as an input to 
your operator that depends on this configuration, each instance of your 
operator will receive this configuration. It shouldn't matter whether Flink 
scheduled your operator on one, some or all of the TaskManagers. It only should 
matter if operators running your configuration sensitive code receive the 
broadcasted message.


DataStream<> input = xxx;
DataStream<> controlConfigInput = yyy;

DataStream<> data = input.
.do()
.something()
.fancy();

controlConfigInput.broadcast()
.connect(data)
.flatMap(new MyFancyOperatorThatDependsOnConfigStream())

Or slide 36 from here: 
https://www.slideshare.net/dataArtisans/apache-flink-datastream-api-basics 


Piotrek

> On 11 May 2018, at 11:11, Di Tang  wrote:
> 
> Hi guys:
> 
> I have a Flink job which contains multiple pipelines. Each pipeline depends 
> on some configuration. I want to make the configuration dynamic and effective 
> after change so I created a data source which periodically poll the database 
> storing the configuration. However, how can I broadcast the events to all 
> task manager instances?  The datastream.broadcast() only applies to the 
> parallel instances of operator. And I don't want to connect the configuration 
> data source to each pipeline because it is too verbose. If Flink cannot 
> explicitly broadcast messages to task managers, is there any method to 
> guarantee the parallel operator is distributed on all task managers?
> 
> Thanks,
> Di



Re: Checkpoint is not triggering as per configuration

2018-05-11 Thread Piotr Nowojski
Hi,

It’s not considered as a bug, only a missing not yet implemented feature (check 
my previous responses for the Jira ticket). Generally speaking using file input 
stream for DataStream programs is not very popular, thus this was so far low on 
our priority list.

Piotrek

> On 10 May 2018, at 06:26, xiatao123  wrote:
> 
> I ran into a similar issue.
> 
> Since it is a "Custom File Source", the first source just listing
> folder/file path for all existing files. Next operator "Split Reader" will
> read the content of the file.  
> "Custom File Source" went to "finished" state after first couple secs. 
> That's way we got this error message "Custom File Source (1/1) is not being
> executed at the moment. Aborting checkpoint". Because the "Custom File
> Source" finished already.
> 
> Is this by design?  Although the "Custom File Source" finished in secs, the
> rest of the pipeline can running for hours or days. Whenever anything went
> wrong, the pipeline will restart and start to reading from the beginning
> again, since there is not any checkpoint.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Flink Forward SF 2018 Videos

2018-05-11 Thread Piotr Nowojski
Hi,

Previous videos were always uploaded there, so I guess the new one should 
appear there shortly. Laura might now something more about it.

Thanks,
Piotrek

> On 10 May 2018, at 23:44, Rafi Aroch  wrote:
> 
> Hi,
> 
> Are there any plans to upload the videos to the Flink Forward YouTube channel?
> 
> There are so many interesting talks I would like to watch and filling out a 
> form for each and every video makes it more difficult... 
> 
> Thanks,
> Rafi



Re: Late data before window end is even close

2018-05-11 Thread Piotr Nowojski
Generally speaking best practise is always to simplify your program as much as 
possible to narrow down the scope of the search. Replace data source with 
statically generated events, remove unnecessary components Etc. Either such 
process help you figure out what’s wrong on your own and if not, if you share 
us such minimal program that reproduces the issue, it will allow  us to debug 
it.

Piotrek

> On 11 May 2018, at 13:54, Juho Autio  wrote:
> 
> Thanks for that code snippet, I should try it out to simulate my DAG.. If any 
> suggestions how to debug futher what's causing late data on a production 
> stream job, please let me know.
> 
> On Fri, May 11, 2018 at 2:18 PM, Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hey,
> 
> Actually I think Fabian initial message was incorrect. As far as I can see in 
> the code of WindowOperator (last lines of 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator#processElement
>  ), the element is sent to late side output if it is late AND it wasn’t 
> assigned to any of the existing windows (because they were late as well). In 
> other words, it should work as you Juho are wishing: element should be marked 
> as late once they are overdue/late for the window after one full day.
> 
> I have tested it and it works as expected. Following program:
> 
> https://gist.github.com/pnowojski/8cd650170925cf35be521cf236f1d97a 
> <https://gist.github.com/pnowojski/8cd650170925cf35be521cf236f1d97a>
> 
> Prints only ONE number to the standard err:
> 
> > 1394
> 
> And there is nothing on the side output.
> 
> Piotrek
> 
>> On 11 May 2018, at 12:32, Juho Autio > <mailto:juho.au...@rovio.com>> wrote:
>> 
>> Thanks. What I still don't get is why my message got filtered in the first 
>> place. Even if the allowed lateness filtering would be done "on the window", 
>> data should not be dropped as late if it's not in fact late by more than the 
>> allowedLateness setting.
>> 
>> Assuming that these conditions hold:
>> - messages (and thus the extracted timestamps) were not out of order by more 
>> than 5 secods (as far as I didn't make any mistake in my partition-level 
>> analysis)
>> - allowedLateness=1 minute
>> - watermarks are assigned on kafka consumer meaning that they are 
>> synchronized across all partitions
>> 
>> I don't see how the watermark could have ever been more than 5 seconds 
>> further when the message arrives on the isElementLate filter. Do you have 
>> any idea on this? Is there some existing test that simulates out of order 
>> input to flink's kafka consumer? I could try to build a test case based on 
>> that to possibly reproduce my problem. I'm not sure how to gather enough 
>> debug information on the production stream so that it would clearly show the 
>> watermarks, how they progressed on each kafka partition & later in the chain 
>> in case isElementLate filters something.
>> 
>> On Fri, May 11, 2018 at 12:12 PM, Fabian Hueske > <mailto:fhue...@gmail.com>> wrote:
>> Hi Juho,
>> 
>> Thanks for bringing up this topic! I share your intuition. 
>> IMO, records should only be filtered out and send to a side output if any of 
>> the windows they would be assigned to is closed already.
>> 
>> I had a look into the code and found that records are filtered out as late 
>> based on the following condition:
>>  
>> protected boolean isElementLate(StreamRecord element){
>>return (windowAssigner.isEventTime()) &&
>>   (element.getTimestamp() + allowedLateness <= 
>> internalTimerService.currentWatermark());
>> }
>> 
>> 
>> This code shows that your analysis is correct. 
>> Records are filtered out based on their timestamp and the current watermark, 
>> even though they arrive before the window is closed.
>> 
>> OTOH, filtering out records based on the window they would end up in can 
>> also be tricky if records are assigned to multiple windows (e.g., sliding 
>> windows).
>> In this case, a side-outputted records could still be in some windows and 
>> not in others. 
>> 
>> @Aljoscha (CC) Might have an explanation for the current behavior.
>> 
>> Thanks,
>> Fabian
>> 
>> 
>> 2018-05-11 10:55 GMT+02:00 Juho Autio > <mailto:juho.au...@rovio.com>>:
>> I don't understand why I'm getting some data discarded as late on my Flink 
>> stream job a long time before the window even closes.
>> 
>> I can not be 100% sure, but to me it seems like the kafka con

Re: How to broadcast messages to all task manager instances in cluster?

2018-05-14 Thread Piotr Nowojski
Hi,

Thanks for the clarification. This might be though. Generally speaking having 
such static configuration shared across multiple operators/functions can pose 
lots of different problems including synchronisation, fault tolerance etc. 

To be honest you should treat such thing almost like an external system that 
has an external state, because from Flink’s perspective that’s exactly what it 
is - it’s an equivalent to having an external “configuration service” 
hosted/stored somewhere outside of Flink. With it you have to manually take 
care of fault tolerance (especially it’s state), since it’s outside of Flink’s 
control. Especially think about what should happen to your static configuration 
if one of your machine fails/restarts, and Flink chooses to restart only part 
of the job graph (possible one, many or all of the operators). How will your 
static configuration be kept in sync across all of the Task Managers in that 
case?

It would be easier if you could restructure your job/problem and replace such 
static configuration with a configuration stored in the Flink’s state (maybe in 
one operator? Or on parallel instances of one task?). Otherwise to make it 
fully reliable I think you would need to write quite a lot of code on your own. 

Alternatively you can consider using some third party systems for storing a 
configuration like Apache ZooKeeper.

Piotrek

> On 13 May 2018, at 10:38, Di Tang  wrote:
> 
> Thanks Piotr for the response. I have many data streams dependant on the 
> configuration by getting value from static variables in a class. The way the 
> configuration change works is to change the static variables' value in the 
> class. Since each task manager only has one JVM process, as long as the 
> message is broadcast to each task manager, the data streams will see the 
> change. The logic in data streams is quite simple, just get some parameters 
> from the static variable. So I think to add connect and flatmap to each of 
> them is too verbose. I am wondering is there any better way to express.
> 
> Piotr Nowojski mailto:pi...@data-artisans.com>> 于 
> 2018年5月11日周五 下午7:31写道:
> Hi,
> 
> I don’t quite understand your problem. If you broadcast message as an input 
> to your operator that depends on this configuration, each instance of your 
> operator will receive this configuration. It shouldn't matter whether Flink 
> scheduled your operator on one, some or all of the TaskManagers. It only 
> should matter if operators running your configuration sensitive code receive 
> the broadcasted message.
> 
> 
> DataStream<> input = xxx;
> DataStream<> controlConfigInput = yyy;
> 
> DataStream<> data = input.
>   .do()
>   .something()
>   .fancy();
> 
> controlConfigInput.broadcast()
>   .connect(data)
>   .flatMap(new MyFancyOperatorThatDependsOnConfigStream())
> 
> Or slide 36 from here: 
> https://www.slideshare.net/dataArtisans/apache-flink-datastream-api-basics 
> <https://www.slideshare.net/dataArtisans/apache-flink-datastream-api-basics>
> 
> Piotrek
> 
>> On 11 May 2018, at 11:11, Di Tang > <mailto:tangdi.b...@gmail.com>> wrote:
>> 
>> Hi guys:
>> 
>> I have a Flink job which contains multiple pipelines. Each pipeline depends 
>> on some configuration. I want to make the configuration dynamic and 
>> effective after change so I created a data source which periodically poll 
>> the database storing the configuration. However, how can I broadcast the 
>> events to all task manager instances?  The datastream.broadcast() only 
>> applies to the parallel instances of operator. And I don't want to connect 
>> the configuration data source to each pipeline because it is too verbose. If 
>> Flink cannot explicitly broadcast messages to task managers, is there any 
>> method to guarantee the parallel operator is distributed on all task 
>> managers?
>> 
>> Thanks,
>> Di
> 



Re: Checkpoint is not triggering as per configuration

2018-05-15 Thread Piotr Nowojski
Hi,

This one: https://issues.apache.org/jira/browse/FLINK-2491 
<https://issues.apache.org/jira/browse/FLINK-2491>

1. What if you set 
`org.apache.flink.streaming.api.functions.source.FileProcessingMode#PROCESS_CONTINUOUSLY`?
 This will prevent split source from finishing, so checkpointing should work 
fine. Downside is that you would have to on your own, manually, determine 
whether the job has finished/completed or not.

Other things that come to my mind would require some coding:

2. Look at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#createFileInput,
 copy it’s code and replace `ContinuousFileMonitoringFunction` with something 
that finishes on some custom event/action/condition. The code that you would 
have to modify/replace is alongside usages of `FileProcessingMode 
monitoringMode`.

3. Probably even more complicated, you could modify 
ContinuousFileReaderOperator to be a source function, with statically 
precomputed list of files/splits to process (they would have to be 
assigned/distributed taking parallelism into account). Thus your source 
functions could complete not when splits are generated, but when they have 
finished reading splits.

Piotrek

> On 14 May 2018, at 20:29, Tao Xia  wrote:
> 
> Thanks for the reply Piotr. Which jira ticket were you refer to?
> We were trying to use the same code for normal stream process to process very 
> old historical backfill data.
> The problem for me right now is that, backfill x years of data will be very 
> slow. And I cannot have any checkpoint during the whole time since FileSource 
> is "Finished". When anything goes wrong in the middle, the whole pipeline 
> will start over from beginning again.
> Anyway I can skip the checkpoint of "Source: Custom File Source" but still 
> having checkpoint on "Split Reader: Custom File Source"?
> Thanks,
> Tao
> 
> On Fri, May 11, 2018 at 4:34 AM, Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> It’s not considered as a bug, only a missing not yet implemented feature 
> (check my previous responses for the Jira ticket). Generally speaking using 
> file input stream for DataStream programs is not very popular, thus this was 
> so far low on our priority list.
> 
> Piotrek
> 
> > On 10 May 2018, at 06:26, xiatao123  > <mailto:t...@udacity.com>> wrote:
> > 
> > I ran into a similar issue.
> > 
> > Since it is a "Custom File Source", the first source just listing
> > folder/file path for all existing files. Next operator "Split Reader" will
> > read the content of the file.  
> > "Custom File Source" went to "finished" state after first couple secs. 
> > That's way we got this error message "Custom File Source (1/1) is not being
> > executed at the moment. Aborting checkpoint". Because the "Custom File
> > Source" finished already.
> > 
> > Is this by design?  Although the "Custom File Source" finished in secs, the
> > rest of the pipeline can running for hours or days. Whenever anything went
> > wrong, the pipeline will restart and start to reading from the beginning
> > again, since there is not any checkpoint.
> > 
> > 
> > 
> > --
> > Sent from: 
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
> 
> 



Re: Question about the behavior of TM when it lost the zookeeper client session in HA mode

2018-05-15 Thread Piotr Nowojski
Hi,

It looks like there was an error in asynchronous job of sending the records to 
Kafka. Probably this is a collateral damage of loosing connection to zookeeper. 

Piotrek

> On 15 May 2018, at 13:33, Ufuk Celebi  wrote:
> 
> Hey Tony,
> 
> thanks for the detailed report.
> 
> - In Flink 1.4, jobs are cancelled if the JM looses the connection to ZK and 
> recovered when the connection is re-established (and one JM becomes leader 
> again).
> 
> - Regarding the KafkaProducer: I'm not sure from the log message whether 
> Flink closes the KafkaProducer because the job is cancelled or because there 
> is a connectivity issue to the Kafka cluster. Including Piotr (cc) in this 
> thread who has worked on the KafkaProducer in the past. If it is a 
> connectivity issue, it might also explain why you lost the connection to ZK.
> 
> Glad to hear that everything is back to normal. Keep us updated if something 
> unexpected happens again.
> 
> – Ufuk
> 
> 
> On Tue, May 15, 2018 at 6:28 AM, Tony Wei  > wrote:
> Hi all,
> 
> I restarted the cluster and changed the log level to DEBUG, and raised the 
> parallelism of my streaming job from 32 to 40.
> However, the problem just disappeared and I don't know why.
> I will remain these settings for a while. If the error happen again, I will 
> bring more informations back for help. Thank you.
> 
> Best Regards,
> Tony Wei
> 
> 2018-05-14 14:24 GMT+08:00 Tony Wei  >:
> Hi all,
> 
> After I changed the `high-availability.zookeeper.client.session-timeout` and 
> `maxSessionTimeout` to 12ms, the exception still occurred.
> 
> Here is the log snippet. It seems this is nothing to do with zookeeper client 
> timeout, but I still don't know why kafka producer would be closed without 
> any task state changed.
> 
> ```
> 2018-05-14 05:18:53,468 WARN  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client 
> session timed out, have not heard from server in 82828ms for sessionid 
> 0x305f957eb8d000a
> 2018-05-14 05:18:53,468 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client 
> session timed out, have not heard from server in 82828ms for sessionid 
> 0x305f957eb8d000a, closing socket connection and attempting reconnect
> 2018-05-14 05:18:53,571 INFO  
> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>   - State change: SUSPENDED
> 2018-05-14 05:18:53,574 WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.
> 2018-05-14 05:18:53,850 WARN  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL 
> configuration failed: javax.security.auth.login.LoginException: No JAAS 
> configuration section named 'Client' was found in specified JAAS 
> configuration file: '/mnt/jaas-466390940757021791.conf'. Will continue 
> connection to Zookeeper server without SASL authentication, if Zookeeper 
> server allows it.
> 2018-05-14 05:18:53,850 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening 
> socket connection to server XXX.XXX.XXX.XXX:2181
> 2018-05-14 05:18:53,852 ERROR 
> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
> Authentication failed
> 2018-05-14 05:18:53,853 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket 
> connection established to XXX.XXX.XXX.XXX:2181, initiating session
> 2018-05-14 05:18:53,859 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 
> establishment complete on server XXX.XXX.XXX.XXX:2181, sessionid = 
> 0x305f957eb8d000a, negotiated timeout = 12
> 2018-05-14 05:18:53,860 INFO  
> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>   - State change: RECONNECTED
> 2018-05-14 05:18:53,860 INFO  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
> 2018-05-14 05:28:54,781 INFO  org.apache.kafka.clients.producer.KafkaProducer 
>   - Closing the Kafka producer with timeoutMillis = 
> 9223372036854775807 ms.
> 2018-05-14 05:28:54,829 INFO  org.apache.kafka.clients.producer.KafkaProducer 
>   - Closing the Kafka producer with timeoutMillis = 
> 9223372036854775807 ms.
> 2018-05-14 05:28:54,918 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - match-rule -> (get-ordinary -> Sink: kafka-sink, get-cd -> 
> Sink: kafka-sink-cd) (1/32) (e3462ff8bb565bb0cf4de49ffc2595fb) switched from 
> RUNNING to FAILED.
> java.lang.Exception: Failed to send data to Kafka: The server disconnected 
> before a response was received.
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
>   at 

Re: Decrease initial source read speed

2018-05-23 Thread Piotr Nowojski
Hi,

Yes, Flink 1.5.0 will come with better tools to handle this problem. Namely you 
will be able to limit the “in flight” data, by controlling the number of 
assigned credits per channel/input gate. Even without any configuring Flink 
1.5.0 will out of the box buffer less data, thus mitigating the problem.

There are some tweaks that you could use to make 1.4.x work better. With small 
records that require heavy processing, generally speaking you do not need huge 
buffers sizes to keep max throughput. You can try to both reduce the buffer 
pool and reduce the memory segment sizes:

https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#configuring-the-network-buffers
 

• taskmanager.network.memory.fraction: Fraction of JVM memory to use 
for network buffers (DEFAULT: 0.1),
• taskmanager.network.memory.min: Minimum memory size for network 
buffers in bytes (DEFAULT: 64 MB),
• taskmanager.network.memory.max: Maximum memory size for network 
buffers in bytes (DEFAULT: 1 GB), and
• taskmanager.memory.segment-size: Size of memory buffers used by the 
memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 KiBytes)).

Reducing those values will reduce amount of in-flight data that will be caught 
between checkpoints. But keep in mind that smaller values can lead to smaller 
throughput, but as I said, with small number of heavy processing records this 
is not an issue. In an extreme example, if your records are lets say 8 bytes 
each and require 1 hour to process, there is almost no need for any buffering. 

Piotrek

> On 23 May 2018, at 12:58, Fabian Hueske  wrote:
> 
> Hi Andrei,
> 
> With the current version of Flink, there is no general solution to this 
> problem.
> The upcoming version 1.5.0 of Flink adds a feature called credit-based flow 
> control which might help here.
> 
> I'm adding @Piotr to this thread who knows more about the details of this new 
> feature.
> 
> Best, Fabian
> 
> 2018-05-18 11:59 GMT+02:00 Andrei Shumanski  >:
> Hi,
> 
> 
> Right now it is a Kafka source, but I had the same issue when reading data 
> from local FS.
> 
> It looks like a common problem for many (all?) sources.
> When incoming data is very small (paths to large archives) but each entry 
> requires a significant time to process (unpack, parse, etc.) Flink detects 
> the back pressure with delay and too much data becomes part of the first 
> transaction.
> 
> 
> 
> -- 
> Best regards,
> Andrei Shumanski
> 
> 
> 
> On Fri, May 18, 2018 at 11:44 AM, makeyang  > wrote:
> Andrei Shumanski:
> which source are u using?
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> 
> 
> 



Re: Decrease initial source read speed

2018-05-23 Thread Piotr Nowojski
Hi,

Yes if you have mixed workload in your pipeline, it is matter of finding a 
right balance. Situation will be better in Flink 1.5, but the underlying issue 
will remain as well - in 1.5.0 there also will be no way to change network 
buffers configuration between stages of the single job.

Currently such explosion of records (one small records producing huge bunch of 
new records) is kind of anti pattern in Flink. Besides the problem that we were 
discussing, the other problem is that you can not checkpoint in the middle of 
processing the big record. I hope that this will change in future Flink 
releases, but currently those are the limitations.

For your case, with initial records being file paths, it might be better to 
embed this logic within a data source, so your data source is already producing 
parsed records. For example FlinkKafkaConsumer is discovering topics/partitions 
on the fly, and the smallest transport unit is still “parsed record” and not a 
“topic” (“file path” in your case). With proper offsets implementation this 
also handles the problem of checkpointing in the middle of processing large 
file.

Piotrek

> On 23 May 2018, at 15:10, Andrei Shumanski  wrote:
> 
> Hi Piotr,
> 
> Thank you very much for your response.
> I will try the new feature of Flink 1.5 when it is released.
> 
> But I am not sure minimising buffers sizes will work in all scenarios.
> If I understand correctly these settings are affecting the whole Flink 
> instance.
> 
> We might have a flow like this:
> 
> Source: Read file paths --> Unpack and parse files --> Analyse parsed data -> 
> ….
> 
> So it will be a very small amount of data at first step but quite a lot of 
> parsed data later.
> Changing buffer sizes globally will probably affect throughput of later 
> steps, as you wrote.
> 
> 
>> On 23 May 2018, at 14:48, Piotr Nowojski > <mailto:pi...@data-artisans.com>> wrote:
>> 
>> Hi,
>> 
>> Yes, Flink 1.5.0 will come with better tools to handle this problem. Namely 
>> you will be able to limit the “in flight” data, by controlling the number of 
>> assigned credits per channel/input gate. Even without any configuring Flink 
>> 1.5.0 will out of the box buffer less data, thus mitigating the problem.
>> 
>> There are some tweaks that you could use to make 1.4.x work better. With 
>> small records that require heavy processing, generally speaking you do not 
>> need huge buffers sizes to keep max throughput. You can try to both reduce 
>> the buffer pool and reduce the memory segment sizes:
>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#configuring-the-network-buffers
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#configuring-the-network-buffers>
>>  • taskmanager.network.memory.fraction: Fraction of JVM memory to use 
>> for network buffers (DEFAULT: 0.1),
>>  • taskmanager.network.memory.min: Minimum memory size for network 
>> buffers in bytes (DEFAULT: 64 MB),
>>  • taskmanager.network.memory.max: Maximum memory size for network 
>> buffers in bytes (DEFAULT: 1 GB), and
>>  • taskmanager.memory.segment-size: Size of memory buffers used by the 
>> memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 
>> KiBytes)).
>> 
>> Reducing those values will reduce amount of in-flight data that will be 
>> caught between checkpoints. But keep in mind that smaller values can lead to 
>> smaller throughput, but as I said, with small number of heavy processing 
>> records this is not an issue. In an extreme example, if your records are 
>> lets say 8 bytes each and require 1 hour to process, there is almost no need 
>> for any buffering. 
>> 
>> Piotrek
>> 
>>> On 23 May 2018, at 12:58, Fabian Hueske >> <mailto:fhue...@gmail.com>> wrote:
>>> 
>>> Hi Andrei,
>>> 
>>> With the current version of Flink, there is no general solution to this 
>>> problem.
>>> The upcoming version 1.5.0 of Flink adds a feature called credit-based flow 
>>> control which might help here.
>>> 
>>> I'm adding @Piotr to this thread who knows more about the details of this 
>>> new feature.
>>> 
>>> Best, Fabian
>>> 
>>> 2018-05-18 11:59 GMT+02:00 Andrei Shumanski >> <mailto:and...@shumanski.com>>:
>>> Hi,
>>> 
>>> 
>>> Right now it is a Kafka source, but I had the same issue when reading data 
>>> from local FS.
>>> 
>>> It looks like a common problem for many (all?) sources.
>>> When incoming data is v

Re: Decrease initial source read speed

2018-05-23 Thread Piotr Nowojski
One more remark. Currently there is unwritten assumption in Flink, that time to 
process records is proportional number of bytes.  As you noted, this brakes in 
case of mixed workloads (especially with file paths sent as records). 

There is interesting workaround this problem though. You could use custom 
serializer for the file paths to artificially blow the record size, for example 
to "segment-size” (32KB), or even more. This is easy to do - for example just 
pad the string with spaces. It would ensure that there is at most one file path 
to process per network buffer and would even out the imbalance of the 
assumption of record size being proportional to number of bytes.

Piotrek

> On 23 May 2018, at 15:40, Piotr Nowojski  wrote:
> 
> Hi,
> 
> Yes if you have mixed workload in your pipeline, it is matter of finding a 
> right balance. Situation will be better in Flink 1.5, but the underlying 
> issue will remain as well - in 1.5.0 there also will be no way to change 
> network buffers configuration between stages of the single job.
> 
> Currently such explosion of records (one small records producing huge bunch 
> of new records) is kind of anti pattern in Flink. Besides the problem that we 
> were discussing, the other problem is that you can not checkpoint in the 
> middle of processing the big record. I hope that this will change in future 
> Flink releases, but currently those are the limitations.
> 
> For your case, with initial records being file paths, it might be better to 
> embed this logic within a data source, so your data source is already 
> producing parsed records. For example FlinkKafkaConsumer is discovering 
> topics/partitions on the fly, and the smallest transport unit is still 
> “parsed record” and not a “topic” (“file path” in your case). With proper 
> offsets implementation this also handles the problem of checkpointing in the 
> middle of processing large file.
> 
> Piotrek
> 
>> On 23 May 2018, at 15:10, Andrei Shumanski > <mailto:and...@shumanski.com>> wrote:
>> 
>> Hi Piotr,
>> 
>> Thank you very much for your response.
>> I will try the new feature of Flink 1.5 when it is released.
>> 
>> But I am not sure minimising buffers sizes will work in all scenarios.
>> If I understand correctly these settings are affecting the whole Flink 
>> instance.
>> 
>> We might have a flow like this:
>> 
>> Source: Read file paths --> Unpack and parse files --> Analyse parsed data 
>> -> ….
>> 
>> So it will be a very small amount of data at first step but quite a lot of 
>> parsed data later.
>> Changing buffer sizes globally will probably affect throughput of later 
>> steps, as you wrote.
>> 
>> 
>>> On 23 May 2018, at 14:48, Piotr Nowojski >> <mailto:pi...@data-artisans.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> Yes, Flink 1.5.0 will come with better tools to handle this problem. Namely 
>>> you will be able to limit the “in flight” data, by controlling the number 
>>> of assigned credits per channel/input gate. Even without any configuring 
>>> Flink 1.5.0 will out of the box buffer less data, thus mitigating the 
>>> problem.
>>> 
>>> There are some tweaks that you could use to make 1.4.x work better. With 
>>> small records that require heavy processing, generally speaking you do not 
>>> need huge buffers sizes to keep max throughput. You can try to both reduce 
>>> the buffer pool and reduce the memory segment sizes:
>>> 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#configuring-the-network-buffers
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#configuring-the-network-buffers>
>>> • taskmanager.network.memory.fraction: Fraction of JVM memory to use 
>>> for network buffers (DEFAULT: 0.1),
>>> • taskmanager.network.memory.min: Minimum memory size for network 
>>> buffers in bytes (DEFAULT: 64 MB),
>>> • taskmanager.network.memory.max: Maximum memory size for network 
>>> buffers in bytes (DEFAULT: 1 GB), and
>>> • taskmanager.memory.segment-size: Size of memory buffers used by the 
>>> memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 
>>> KiBytes)).
>>> 
>>> Reducing those values will reduce amount of in-flight data that will be 
>>> caught between checkpoints. But keep in mind that smaller values can lead 
>>> to smaller throughput, but as I said, with small number of heavy processing 
>>> records this is not an issue. In an extreme example, if yo

Re: FlinkKinesisProducer weird behaviour

2018-05-24 Thread Piotr Nowojski
Hi,

Have you tried to write the same records, with exactly the same configuration 
to the Kinesis, but outside of Flink (with some standalone Java application)?

Piotrek

> On 24 May 2018, at 09:40, Rafi Aroch  wrote:
> 
> Hi,
> 
> We're using Kinesis as our input & output of a job and experiencing parsing 
> exception while reading from the output stream. All streams contain 1 shard 
> only.
> 
> While investigating the issue I noticed a weird behaviour where records get a 
> PartitionKey I did not assign and the record Data is being wrapped with 
> random illegal chars.
> 
> I wrote a very basic program to try to isolate the problem, but still I see 
> this happening:
> I wrote a simple SourceFunction which generates messages of the pattern - 
> -AAA\n
> FlinkKinesisProducer writes the messages the Kinesis stream with a default 
> partitionKey of "0" - so I expect ALL records to have partitionKey of "0"
> To verify the records in the Kinesis stream I use AWS CLI get-records API and 
> see the following:
> 
> ...
> {
> "SequenceNumber": 
> "49584735873509122272926425626745413182361252610143420418",
> "ApproximateArrivalTimestamp": 1527144766.662,
> "Data": 
> "84mawgoBMBpsCAAaaDc5LUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUEKGmwIABpoODAtQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQodBmhDDIwmRVeomHOIGlWJ",
> "PartitionKey": "a"
> },
> {
> "SequenceNumber": 
> "49584735873509122272926425626746622108180867308037603330",
> "ApproximateArrivalTimestamp": 1527144766.86,
> "Data": 
> "QUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQo=",
> "PartitionKey": "0"
> },
> ...
> 
> Where did PartitionKey "a" come from?
> 
> Further more, if you Base64 decode the record data of the records you see 
> that all records written with this PartitionKey "a" are wrapped with weird 
> illegal characters.
> For example:
> 
> $ echo 
> 84mawgoBMBpsCAAaaDc5LUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUEKGmwIABpoODAtQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQodBmhDDIwmRVeomHOIGlWJ
>  | base64 --decode
> ��
> 0h79-
> h80-
> hC
>   �&EW��s�U�r
> 
> While the records with PartitionKey "0" look good:
> 
> $ echo 
> ODEtQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQo=
>  | base64 --decode
> 81-
> 
> 
> I tried using both 1.4.2 version & 1.6-SNAPSHOT and still see the issue...
> 
> Here is a link to the gist: 
> https://gist.github.com/aroch/7fb4219e7ada74f30654f1effe9d2f43 
> 
> 
> Am I missing anything? Has anyone encountered such issue?
> 
> Would appreciate any help,
> 
> Rafi



Re: Starting beam pipeline from savepoint

2018-05-24 Thread Piotr Nowojski
Hi,

I am not sure, but it probably brings down to the question, whether you can 
restore from a save point when using RemoteEnvironment - and answer for this 
question is unfortunately no.

Maybe Aljoscha will know something more.

Piotrek

> On 23 May 2018, at 19:25, borisbolvig  wrote:
> 
> Is it possible to start Beam pipelines from savepoints when running on Flink
> (v.1.4)?
> 
> I am running flink in a remote environment, and executing the pipelines as
> .jars specifying the flink jobmanager via cmd line arguments. This is
> opposed to passing the .jar to `flink run`.
> 
> In this way the jar files are not directly available to the jobmanager, and
> indeed the API /jars returns empty. 
> 
> Now to restart from a savepoint (either using the REST API or CLI) it is
> expected that I pass the pipeline jar. However this is not available to the
> jobmanager.
> 
> Note that automatic restarting from checkpoints and creating savepoints is
> working.
> 
> Thank you for your help.
> 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: When is 1.5 coming out

2018-05-24 Thread Piotr Nowojski
It seems like Flink-9349 will make it to the 1.5, because RC5 was cancelled.

Piotrek

> On 24 May 2018, at 14:03, Vishal Santoshi  wrote:
> 
> Flink-9349 can we out this to 1.5 release.
> 
> On Thu, May 24, 2018, 7:59 AM Vishal Santoshi  > wrote:
> Thanks, I'll check it out. 
> 
> On Wed, May 23, 2018, 9:08 AM Fabian Hueske  > wrote:
> Hi Vishal,
> 
> Release candidate 5 (RC5) has been published and the voting period ends later 
> today.
> Unless we find a blocking issue, we can push the release out today.
> 
> FYI, if you are interested in the release progress, you can subscribe to the 
> dev mailing list (or just check out the archives at lists.apache.org 
> ) where the release work is coordinated.
> 
> Best, Fabian
> 
> 2018-05-23 14:56 GMT+02:00 Vishal Santoshi  >:
> It has been some time and would want to know when it is officially a release. 
> 



Re: FlinkKinesisProducer weird behaviour

2018-05-24 Thread Piotr Nowojski
Hi,

I’m glad that you have figured it out.

Unfortunately it’s almost impossible to mention in our documentation all of the 
quirks of connectors that we are using, since it would more or less finally 
come down to fully coping their documentation :( However I created a small PR 
that mentions this issue:

https://github.com/apache/flink/pull/6072 
<https://github.com/apache/flink/pull/6072>

Please feel free to make further comments/suggestions there

Thanks, Piotrek

> On 24 May 2018, at 14:35, Rafi Aroch  wrote:
> 
> Hi,
> 
> Thanks Piotr for your response.
> 
> I've further investigated the issue and found the root cause.
> 
> There are 2 possible ways to produce/consume records to/from Kinesis:
> Using the Kinesis Data Streams service API directly
> Using the KCL & KPL.
> The FlinkKinesisProducer uses the AWS KPL to push records into Kinesis, for 
> optimized performance. One of the features of the KPL is Aggregation, meaning 
> that it batches many UserRecords into one Kinesis Record to increase producer 
> throughput.
> The thing is, that consumers of that stream needs to be aware that the 
> records being consumed are aggregated and handle it accordingly [1][2].
> 
> In my case, the output stream is being consumed by Druid <http://druid.io/>. 
> So the consumer code is not in my control...
> So my choices are to disable the Aggregation feature by passing 
> aggregationEnable: false in the kinesis configuration or writing my own 
> custom consumer for Druid.
> 
> I think that we should state this as part of the documentation for Flink 
> Kinesis Connector.
> 
> [1] 
> https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html
>  
> <https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html>
> [2] 
> https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-integration.html 
> <https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-integration.html>
> 
> Thanks,
> Rafi
> 
> On Thu, May 24, 2018 at 11:18 AM, Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Have you tried to write the same records, with exactly the same configuration 
> to the Kinesis, but outside of Flink (with some standalone Java application)?
> 
> Piotrek
> 
> 
>> On 24 May 2018, at 09:40, Rafi Aroch > <mailto:rafi.ar...@gmail.com>> wrote:
>> 
>> Hi,
>> 
>> We're using Kinesis as our input & output of a job and experiencing parsing 
>> exception while reading from the output stream. All streams contain 1 shard 
>> only.
>> 
>> While investigating the issue I noticed a weird behaviour where records get 
>> a PartitionKey I did not assign and the record Data is being wrapped with 
>> random illegal chars.
>> 
>> I wrote a very basic program to try to isolate the problem, but still I see 
>> this happening:
>> I wrote a simple SourceFunction which generates messages of the pattern - 
>> -AAA\n
>> FlinkKinesisProducer writes the messages the Kinesis stream with a default 
>> partitionKey of "0" - so I expect ALL records to have partitionKey of "0"
>> To verify the records in the Kinesis stream I use AWS CLI get-records API 
>> and see the following:
>> 
>> ...
>> {
>> "SequenceNumber": 
>> "49584735873509122272926425626745413182361252610143420418",
>> "ApproximateArrivalTimestamp": 1527144766.662,
>> "Data": 
>> "84mawgoBMBpsCAAaaDc5LUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUEKGmwIABpoODAtQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQodBmhDDIwmRVeomHOIGlWJ",
>> "PartitionKey": "a"
>> },
>> {
>> "SequenceNumber": 
>> "49584735873509122272926425626746622108180867308037603330",
>> "ApproximateArrivalTimestamp": 1527144766.86,
>> "Data": 
>> "QUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQo=",
>> "PartitionKey": "0"
>> },
>> ...
>> 
>> Where did PartitionKey "a" come from?
>> 
>> Further more, if you Base64 decode the record d

Re: Akka Http used in custom RichSourceFunction

2018-05-24 Thread Piotr Nowojski
Hi,

Please take a look on 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.html
 

 . Especially check if you are using child-first class loading config. If it 
doesn’t help, probably you should shade your akka dependency. 

What is the full exception? Is it thrown when YOURS code tries to shutdown, or 
when FLINK’s code tries to shutdown?

Piotrek

> On 24 May 2018, at 14:38, Niels van Kaam  wrote:
> 
> Hi All,
> 
> I wrote a custom source function (RichSourceFunction) which connects to a web 
> socket using the Akka Http Library. The job using this source runs fine on a 
> local environment until upon shutdown I see the following error in the log: 
> "Exception in thread "main" java.lang.NoSuchMethodError: 
> akka.actor.ActorSystem.shutdown()V"
> 
> My impression is the issue is caused by a version conflict between flink's 
> akka dependency and my own one (due to akka http). This seems to be related 
> to this issue: https://issues.apache.org/jira/browse/FLINK-9240 
> 
> 
> Can I somehow avoid this conflict?
> If not, does this mean I should avoid using Akka (or at least other versions 
> than Flink's) within my sources/sinks?
> Or can I safely catch and ignore the error? 
> 
> My dependencies are:
> Flink: 1.4.2
> akka-actor: 2.5.12
> akka-stream: 2.5.12
> akka-http: 10.1.1
> 
> Thank you for your help!
> 
> Cheers,
> Niels
> 
> 



Re: Multiple stream operator watermark handling

2018-05-24 Thread Piotr Nowojski
Hi,

From top of my head I can imagine two solutions:

1. Override the default behaviour of the operator via for example 
org.apache.flink.streaming.api.datastream.ConnectedStreams#transform

2. Can you set control stream’s watermark to Watermark#MAX_WATERMARK or maybe 
Watermark#MAX_WATERMARK - 1 ?

Piotrek

> On 24 May 2018, at 16:07, Elias Levy  wrote:
> 
> Is there mechanism for a multiple stream operator to ignore watermarks from 
> one of the streams?
> 
> The use case is a multiple stream operator that consumes a primary stream and 
> a secondary control stream.  The control stream may only receive messages in 
> rare occasion, and possibly never.  The default behavior of the operator is 
> to only emit the lowest of the last watermark received from each input 
> stream.  That means that event time fails to advance if there are no control 
> messages.  
> 
> I also notice that FLIP-17, the Side Input proposal, does not address this 
> issue, either in the Wiki or in the Google Docs.
> 
> Assuming there is no currently prescribed way to handle this, are folks 
> taking care of this by introducing a new Assigner after the multiple input 
> operator to generate watermarks?
> 
> 



Re: When is 1.5 coming out

2018-05-24 Thread Piotr Nowojski
Hi,

There will be a new release candidate out today, but again, exact release 
depends if we find some release blocking bugs or not. In the mean time you 
could try out and test the RC :)

Piotrek

> On 24 May 2018, at 16:33, Vishal Santoshi  wrote:
> 
> Thanks. Do we have a timeline for when we would be out with 1.5. It seems got 
> delayed again ?
> 
> On Thu, May 24, 2018 at 8:07 AM, Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> It seems like Flink-9349 will make it to the 1.5, because RC5 was cancelled.
> 
> Piotrek
> 
> 
>> On 24 May 2018, at 14:03, Vishal Santoshi > <mailto:vishal.santo...@gmail.com>> wrote:
>> 
>> Flink-9349 can we out this to 1.5 release.
>> 
>> On Thu, May 24, 2018, 7:59 AM Vishal Santoshi > <mailto:vishal.santo...@gmail.com>> wrote:
>> Thanks, I'll check it out. 
>> 
>> On Wed, May 23, 2018, 9:08 AM Fabian Hueske > <mailto:fhue...@gmail.com>> wrote:
>> Hi Vishal,
>> 
>> Release candidate 5 (RC5) has been published and the voting period ends 
>> later today.
>> Unless we find a blocking issue, we can push the release out today.
>> 
>> FYI, if you are interested in the release progress, you can subscribe to the 
>> dev mailing list (or just check out the archives at lists.apache.org 
>> <http://lists.apache.org/>) where the release work is coordinated.
>> 
>> Best, Fabian
>> 
>> 2018-05-23 14:56 GMT+02:00 Vishal Santoshi > <mailto:vishal.santo...@gmail.com>>:
>> It has been some time and would want to know when it is officially a 
>> release. 
>> 
> 
> 



Re: Multiple stream operator watermark handling

2018-05-25 Thread Piotr Nowojski
Great to hear that this worked out for you :)

Progression of watermarks on an empty stream is a known issue, that we are 
working on to resolve in the future. Usually recommended workarounds are to 
send a custom blank event (which should be ignored) once a while.

I have expanded the documentation:
https://github.com/apache/flink/pull/6076 
<https://github.com/apache/flink/pull/6076>
Please check it and If you have any further suggestions you are welcome to make 
a comments in the PR. I hope it clarifies the behaviour.

Piotrek

> On 25 May 2018, at 00:03, Elias Levy  wrote:
> 
> On Thu, May 24, 2018 at 9:20 AM, Elias Levy  <mailto:fearsome.lucid...@gmail.com>> wrote:
> On Thu, May 24, 2018 at 7:26 AM, Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> From top of my head I can imagine two solutions:
> 
> 1. Override the default behaviour of the operator via for example 
> org.apache.flink.streaming.api.datastream.ConnectedStreams#transform
> 
> That seems the safer, but more complicated path.
> 
> As we had already implemented the business logic in a RichCoFlatMapFunction, 
> I ended up extending CoStreamFlatMap:
> 
> class SingleWatermarkCoFlatMap[IN1,IN2,OUT](flatMapper: 
> CoFlatMapFunction[IN1,IN2,OUT]) extends CoStreamFlatMap(flatMapper)  {
> 
>   // Pass through the watermarks from the first stream
>   override def processWatermark1(mark: Watermark): Unit = 
> processWatermark(mark)
> 
>   // Ignore watermarks from the second stream
>   override def processWatermark2(mark: Watermark): Unit = {}
> }
> 
> 
> Then it was easy to replace:
> 
> stream1
>   .connect(stream2)
>   .flatMap( new BusinessCoFlatMapFunction(params) )
> .name("Operator")
> .uid("op")
> 
> with:
> 
> stream1
>   .connect(stream2)
>   .transform("Operator", new SingleWatermarkCoFlatMap[X,Y,Z](new 
> BusinessCoFlatMapFunction(params)))
>   .uid("op")
> 
> 



Re: Akka Http used in custom RichSourceFunction

2018-05-25 Thread Piotr Nowojski
Hi,

Yes, this might be the cause of the issue, because indeed it looks like your 
akka’s version is leaking to Flink’s classloader.

Piotrek

> On 25 May 2018, at 09:40, Niels van Kaam  wrote:
> 
> Hi Piotrek,
> 
> Thank you for your response!
> 
> I am currently just testing the job in a local environment. I think that 
> means all classes are in the Java classpath, which might also be the issue 
> then. 
> If I am correct that means I am currently not using dynamic classloading and 
> just overwriting the Akka version, also for Flink.
> 
> I will try moving my websocket connector to a seperate package and shade it's 
> Akka dependency.
> 
> Code that starts the job: 
> https://github.com/nvankaam/flinkwebsocketsource/blob/master/src/main/scala/net/vankaam/flink/WebSocketSample.scala
>  
> <https://github.com/nvankaam/flinkwebsocketsource/blob/master/src/main/scala/net/vankaam/flink/WebSocketSample.scala>
>  
> Dependencies: 
> https://github.com/nvankaam/flinkwebsocketsource/blob/master/build.sbt 
> <https://github.com/nvankaam/flinkwebsocketsource/blob/master/build.sbt> 
> 
> The full stack trace of the exception (I think this is the shutdown of the 
> Flink minicluster):
> 
> Exception in thread "main" java.lang.NoSuchMethodError: 
> akka.actor.ActorSystem.shutdown()V
>   at 
> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5$$anonfun$apply$7.apply(FlinkMiniCluster.scala:483)
>   at 
> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5$$anonfun$apply$7.apply(FlinkMiniCluster.scala:483)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5.apply(FlinkMiniCluster.scala:483)
>   at 
> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5.apply(FlinkMiniCluster.scala:483)
>   at scala.Option.foreach(Option.scala:257)
>   at 
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.startInternalShutdown(FlinkMiniCluster.scala:482)
>   at 
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.stop(FlinkMiniCluster.scala:434)
>   at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:112)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629)
>   at net.vankaam.flink.WebSocketSample$.main(WebSocketSample.scala:42)
>   at net.vankaam.flink.WebSocketSample.main(WebSocketSample.scala)
> 
> 
> Cheers,
> Niels
> 
> 
> On Thu, May 24, 2018 at 4:08 PM Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Please take a look on 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.html>
>  . Especially check if you are using child-first class loading config. If it 
> doesn’t help, probably you should shade your akka dependency. 
> 
> What is the full exception? Is it thrown when YOURS code tries to shutdown, 
> or when FLINK’s code tries to shutdown?
> 
> Piotrek
> 
> 
>> On 24 May 2018, at 14:38, Niels van Kaam > <mailto:ni...@vankaam.net>> wrote:
>> 
>> Hi All,
>> 
>> I wrote a custom source function (RichSourceFunction) which connects to a 
>> web socket using the Akka Http Library. The job using this source runs fine 
>> on a local environment until upon shutdown I see the following error in the 
>> log: "Exception in thread "main" java.lang.NoSuchMethodError: 
>> akka.actor.ActorSystem.shutdown()V"
>> 
>> My impression is the issue is caused by a version conflict between flink's 
>> akka dependency and my own one (due to akka http). This seems to be related 
>> to this issue: https://issues.apache.org/jira/browse/FLINK-9240 
>> <https://issues.apache.org/jira/browse/FLINK-9240>
>> 
>> Can I somehow avoid this conflict?
>> If not, does this mean I should avoid using Akka (or at least other versions 
>> than Flink's) within my sources/sinks?
>> Or can I safely catch and ignore the error? 
>> 
>> My dependencies are:
>> Flink: 1.4.2
>> akka-actor: 2.5.12
>> akka-stream: 2.5.12
>> akka-http: 10.1.1
>> 
>> Thank you for your help!
>> 
>> Cheers,
>> Niels
>> 
>> 
> 



Re: Akka Http used in custom RichSourceFunction

2018-05-25 Thread Piotr Nowojski
Thanks for getting back and I’m glad that you were able to resolve your issue :)

Piotrek

> On 25 May 2018, at 11:25, Niels van Kaam  wrote:
> 
> Hi,
> 
> It was indeed the problem, and shading my akka dependency has solved the 
> problem. Thank you for pointing that out!
> 
> For references:
> 
> When shading akka you also need to merge the reference.conf files from akka, 
> or it will fail. This page contains useful documentation on how to shade 
> akka: 
> https://doc.akka.io/docs/akka/2.5.12/general/configuration.html#When_using_JarJar__OneJar__Assembly_or_any_jar-bundler
>  
> <https://doc.akka.io/docs/akka/2.5.12/general/configuration.html#When_using_JarJar__OneJar__Assembly_or_any_jar-bundler>
> 
> Example POM: https://github.com/nvankaam/websocketclient 
> <https://github.com/nvankaam/websocketclient>
> 
> Niels
> 
> 
> 
> On Fri, May 25, 2018 at 11:00 AM Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Yes, this might be the cause of the issue, because indeed it looks like your 
> akka’s version is leaking to Flink’s classloader.
> 
> Piotrek
> 
> 
>> On 25 May 2018, at 09:40, Niels van Kaam > <mailto:ni...@vankaam.net>> wrote:
>> 
>> Hi Piotrek,
>> 
>> Thank you for your response!
>> 
>> I am currently just testing the job in a local environment. I think that 
>> means all classes are in the Java classpath, which might also be the issue 
>> then. 
>> If I am correct that means I am currently not using dynamic classloading and 
>> just overwriting the Akka version, also for Flink.
>> 
>> I will try moving my websocket connector to a seperate package and shade 
>> it's Akka dependency.
>> 
>> Code that starts the job: 
>> https://github.com/nvankaam/flinkwebsocketsource/blob/master/src/main/scala/net/vankaam/flink/WebSocketSample.scala
>>  
>> <https://github.com/nvankaam/flinkwebsocketsource/blob/master/src/main/scala/net/vankaam/flink/WebSocketSample.scala>
>>  
>> Dependencies: 
>> https://github.com/nvankaam/flinkwebsocketsource/blob/master/build.sbt 
>> <https://github.com/nvankaam/flinkwebsocketsource/blob/master/build.sbt> 
>> 
>> The full stack trace of the exception (I think this is the shutdown of the 
>> Flink minicluster):
>> 
>> Exception in thread "main" java.lang.NoSuchMethodError: 
>> akka.actor.ActorSystem.shutdown()V
>>  at 
>> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5$$anonfun$apply$7.apply(FlinkMiniCluster.scala:483)
>>  at 
>> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5$$anonfun$apply$7.apply(FlinkMiniCluster.scala:483)
>>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>  at 
>> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5.apply(FlinkMiniCluster.scala:483)
>>  at 
>> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$startInternalShutdown$5.apply(FlinkMiniCluster.scala:483)
>>  at scala.Option.foreach(Option.scala:257)
>>  at 
>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.startInternalShutdown(FlinkMiniCluster.scala:482)
>>  at 
>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.stop(FlinkMiniCluster.scala:434)
>>  at 
>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:112)
>>  at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501)
>>  at 
>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629)
>>  at net.vankaam.flink.WebSocketSample$.main(WebSocketSample.scala:42)
>>  at net.vankaam.flink.WebSocketSample.main(WebSocketSample.scala)
>> 
>> 
>> Cheers,
>> Niels
>> 
>> 
>> On Thu, May 24, 2018 at 4:08 PM Piotr Nowojski > <mailto:pi...@data-artisans.com>> wrote:
>> Hi,
>> 
>> Please take a look on 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.html
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/debugging_classloading.html>
>>  . Especially check if you are using child-first class l

Re: Odd job failure

2018-05-28 Thread Piotr Nowojski
Hi,

I think that’s unlikely to happen. As far as I know, the only way to actually 
unload the classes in JVM is when their class loader is garbage collected, 
which means all the references in the code to it must vanish. In other words, 
it should never happen that class is not found while anyone is still 
referencing it.

Most likely suspect is the standard java problem of some dependency convergence 
issue. Please check if you are not pulling in multiple Kafka versions into your 
class path. Especially your job shouldn’t pull any Kafka library except of the 
one that comes from flnk-connector-kafka-0.11 (which is 0.11.0.2).

Please also consider upgrading your cluster at least to Kafka 0.11.0.2. Kafka 
0.11.0.0 was pretty unstable release, and we do not support it. Our connector 
depend on Kafka 0.11.0.2 client and while I don’t assume that there is some 
incompatibility between 0.11.0.0 cluster and 0.11.0.2 client, it definitely 
wouldn’t hurt to upgrade the cluster.

Piotrek

> On 26 May 2018, at 17:58, Elias Levy  wrote:
> 
> Piotr & Stephan,
> 
> Thanks for the replies.  Apologies for the late response.  I've been 
> traveling for the past month.
> 
> We've not observed this issue (spilling) again, but it is good to know that 
> 1.5 will use back-pressure based alignment.  I think for now we'll leave 
> task.checkpoint.alignment.max-size as is and work towards moving to 1.5 once 
> we confirm it is stable.
> 
> As for the java.lang.NoClassDefFoundError: 
> org/apache/kafka/clients/NetworkClient$1 error.  We see that one constantly 
> when jobs are canceled/restarted/upgraded.  We are using the 
> flink-connector-kafka-0.11 connector against a 0.11.0.0 cluster.  The error 
> indicates to me that the Kafka threads are not being fully shutdown and they 
> are trying to reload the NetworkClient class but failing, maybe because the 
> code is no longer accessible via the class loader or some other reason.  
> 
> It looks like others are observing the same error.  Alexander Smirnov 
> reported it here on the list last month as well.
> 
> 
> On Thu, May 3, 2018 at 1:22 AM, Stephan Ewen  <mailto:se...@apache.org>> wrote:
> Hi Elias!
> 
> Concerning the spilling of alignment data to disk:
> 
>   - In 1.4.x , you can set an upper limit via " 
> task.checkpoint.alignment.max-size ". See [1].
>   - In 1.5.x, the default is a back-pressure based alignment, which does not 
> spill any more.
> 
> Best,
> Stephan
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#task-checkpoint-alignment-max-size
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#task-checkpoint-alignment-max-size>
> 
> On Wed, May 2, 2018 at 1:37 PM, Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> It might be some Kafka issue. 
> 
> From what you described your reasoning seems sound. For some reason TM3 fails 
> and is unable to restart and process any data, thus forcing spilling on 
> checkpoint barriers on TM1 and TM2.
> 
> I don’t know the reason behind java.lang.NoClassDefFoundError: 
> org/apache/kafka/clients/NetworkClient$1 errors, but it doesn’t seem to be 
> important in this case.
> 
> 1. What Kafka version are you using? Have you looked for any known Kafka 
> issues with those symptoms?
> 2. Maybe the easiest thing will be to reformat/reinstall/recreate TM3 AWS 
> image? It might be some system issue.
> 
> Piotrek
> 
>> On 28 Apr 2018, at 01:54, Elias Levy > <mailto:fearsome.lucid...@gmail.com>> wrote:
>> 
>> We had a job on a Flink 1.4.2 cluster with three TMs experience an odd 
>> failure the other day.  It seems that it started as some sort of network 
>> event.  
>> 
>> It began with the 3rd TM starting to warn every 30 seconds about socket 
>> timeouts while sending metrics to DataDog.  This latest for the whole outage.
>> 
>> Twelve minutes later, all TMs reported at nearly the same time that they had 
>> marked the Kafka coordinator as deed ("Marking the coordinator XXX (id: 
>> 2147482640 rack: null) dead for group ZZZ").  The job terminated and the 
>> system attempted to recover it.  Then things got into a weird state.
>> 
>> The following related for six or seven times for a period of about 40 
>> minutes: 
>> TM attempts to restart the job, but only the first and second TMs show signs 
>> of doing so.  
>> The disk begins to fill up on TMs 1 and 2.  
>> TMs 1 & 2 both report java.lang.NoClassDefFoundError: 
>> org/apache/kafka/clients/NetworkClient$1 errors.  These were mentioned on 
>> this list earlier this month.  It is unclear if the are benign.
>&g

Re: Odd job failure

2018-05-29 Thread Piotr Nowojski
Hi,

Could you post full output of the mvn dependency:tree command on your project?
Can you reproduce this issue with some minimal project stripped down of any 
custom code/external dependencies except of Flink itself?

Thanks Piotrek

> On 28 May 2018, at 20:13, Elias Levy  wrote:
> 
> On Mon, May 28, 2018 at 1:48 AM, Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Most likely suspect is the standard java problem of some dependency 
> convergence issue. Please check if you are not pulling in multiple Kafka 
> versions into your class path. Especially your job shouldn’t pull any Kafka 
> library except of the one that comes from flnk-connector-kafka-0.11 (which is 
> 0.11.0.2).
> 
> Alas, that is not the case.  The job correctly includes 
> kafka-clients:0.11.0.2 <http://0.11.0.2/>:
> 
> [warn] Found version conflict(s) in library dependencies; some are suspected 
> to be binary incompatible:
> [warn] 
> [warn]* org.apache.kafka:kafka-clients:0.11.0.2 is selected over 
> {0.10.2.1, 0.9.0.1}
> [warn]+- org.apache.flink:flink-connector-kafka-0.11_2.11:1.4.2 
> (depends on 0.11.0.2)
> [warn]+- org.apache.flink:flink-connector-kafka-0.9_2.11:1.4.2 
> (depends on 0.10.2.1)
> [warn]+- org.apache.flink:flink-connector-kafka-0.10_2.11:1.4.2 
> (depends on 0.10.2.1)
> [warn] 
> 
>  
> Please also consider upgrading your cluster at least to Kafka 0.11.0.2. Kafka 
> 0.11.0.0 was pretty unstable release, and we do not support it. Our connector 
> depend on Kafka 0.11.0.2 client and while I don’t assume that there is some 
> incompatibility between 0.11.0.0 cluster and 0.11.0.2 client, it definitely 
> wouldn’t hurt to upgrade the cluster.
> 
> Thanks for the tip.  That said, this error should be unrelated to the version 
> of the cluster.
> 



Re: Question about JVM exit caused by timeout exception with the asynchronous IO of flink 1.4.2

2018-06-07 Thread Piotr Nowojski
Hi,

You can increase a timeout, that’s one way to tackle it. 

In Flink 1.6.0 there will be possibility to override default Flink’s behaviour 
regarding handling timeouts:
https://issues.apache.org/jira/browse/FLINK-7789 
 to handle them, instead of 
out right failing.

Also if you can not wait for the new release, you always could copy 
AsyncWaitOperator with AsyncFunction into your code base and apply the changes 
from the above mentioned ticket 
(https://github.com/apache/flink/pull/6091/files 
)

Piotrek

> On 6 Jun 2018, at 10:39, 陈卓  wrote:
> 
> HI
> The asynchronous IO of flink 1.4.2 will throw timeout exception when the 
> timeout setting is one second and the invoke time setting is greater than 
> twenty seconds. Unfortunately the timeout exception cannot be captured, which 
> leads to abnormal exit of the process. So my question is how to deal with 
> this situation to keep the jvm executing.
>  
> the exception info:
> 
>  
> code as follows
> 
>  
> 
> 
>  
>  
> 
>  
>  
> -- 
> Thanks
> zhuo chen



Re: Extending stream events with a an aggregate value

2018-06-07 Thread Piotr Nowojski
Hi,

Ńo worries :) You probably need to write your own process function to do 
exactly that, maybe something like this:

DataStream> test;

DataStream> max = test.keyBy(0)
  .process(new KeyedProcessFunction, 
Tuple3>() {
 public ValueState max;

 @Override
 public void open(Configuration parameters) throws Exception {
ValueStateDescriptor descriptor =
  new ValueStateDescriptor<>("max", TypeInformation.of(new 
TypeHint() {
  }));
sum = getRuntimeContext().getState(descriptor);
 }

 @Override
 public void processElement(Tuple2 value, Context ctx, 
Collector> out) throws Exception {
// ...
 }
  });

You need to store max on the state if you care about recovering from 
failures/restarts without loosing previous max value. Please check the online 
documentation for ProcessFunction and handling state in Flink :)

Piotrek

> On 6 Jun 2018, at 15:55, Nicholas Walton  wrote:
> 
> I’m sure I’m being a complete idiot, since this seems so trivial but if 
> someone could point me in the right direction I’d be very grateful.
> 
> I have a simple data stream [(Int, Double)] keyed on the Int. I can calculate 
> the running max of the stream no problem using “.max(2)”. But I want to 
> output the original input together with the running max value as [(Int, 
> Double, Double)]. I’ve hunted high and low for a means to do something so 
> trivial.
> 
> Nick Walton



Re: Conceptual question

2018-06-07 Thread Piotr Nowojski
Hi,

General solution for state/schema migration is under development and it might 
be released with Flink 1.6.0.

Before that, you need to manually handle the state migration in your operator’s 
open method. Lets assume that your OperatorV1 has a state field “stateV1”. Your 
OperatorV2 defines field “stateV2”, which is incompatible with previous 
version. What you can do, is to add a logic in open method, to check:
1. If “stateV2” is non empty, do nothing
2. If there is no “stateV2”, iterate over all of the keys and manually migrate 
“stateV1” to “stateV2”

In your OperatorV3 you could drop the support for “stateV1”.

I have once implemented something like that here:

https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258
 


Hope that helps!

Piotrek

> On 6 Jun 2018, at 17:04, TechnoMage  wrote:
> 
> We are still pretty new to Flink and I have a conceptual / DevOps question.
> 
> When a job is modified and we want to deploy the new version, what is the 
> preferred method?  Our jobs have a lot of keyed state.
> 
> If we use snapshots we have old state that may no longer apply to the new 
> pipeline.
> If we start a new job we can reprocess historical data from Kafka, but that 
> can be very resource heavy for a while.
> 
> Is there an option I am missing?  Are there facilities to “patch” or “purge” 
> selectively the keyed state?
> 
> Michael



Re: Conceptual question

2018-06-07 Thread Piotr Nowojski
What function are you implementing and how are you using it?

Usually it’s enough if your function implements RichFunction (or rather extend 
from AbstractRichFunction) and then you could use RichFunction#open in the 
similar manner as in the code that I posted in previous message. Flink in many 
places performs instanceof chekcs like: 
org.apache.flink.api.common.functions.util.FunctionUtils#openFunction

public static void openFunction(Function function, Configuration parameters) 
throws Exception{
   if (function instanceof RichFunction) {
  RichFunction richFunction = (RichFunction) function;
  richFunction.open(parameters);
   }
}

Piotrek

> On 7 Jun 2018, at 11:07, Tony Wei  wrote:
> 
> Hi Piotrek,
> 
> It seems that this was implemented by `Operator` API, which is a more low 
> level api compared to `Function` API.
> Since in `Function` API level we can only migrate state by event triggered, 
> it is more convenient in this way to migrate state by foreach all keys in 
> `open()` method.
> If I was implemented state operator by `ProcessFunction` API, is it possible 
> to port it to `KeyedProcessOperator` and do the state migration that you 
> mentioned?
> And are there something concerned and difficulties that will leads to 
> restored state failed or other problems? Thank you!
> 
> Best Regards,
> Tony Wei
> 
> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski  <mailto:pi...@data-artisans.com>>:
> Hi,
> 
> General solution for state/schema migration is under development and it might 
> be released with Flink 1.6.0.
> 
> Before that, you need to manually handle the state migration in your 
> operator’s open method. Lets assume that your OperatorV1 has a state field 
> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible 
> with previous version. What you can do, is to add a logic in open method, to 
> check:
> 1. If “stateV2” is non empty, do nothing
> 2. If there is no “stateV2”, iterate over all of the keys and manually 
> migrate “stateV1” to “stateV2”
> 
> In your OperatorV3 you could drop the support for “stateV1”.
> 
> I have once implemented something like that here:
> 
> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258
>  
> <https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258>
> 
> Hope that helps!
> 
> Piotrek
> 
> 
>> On 6 Jun 2018, at 17:04, TechnoMage > <mailto:mla...@technomage.com>> wrote:
>> 
>> We are still pretty new to Flink and I have a conceptual / DevOps question.
>> 
>> When a job is modified and we want to deploy the new version, what is the 
>> preferred method?  Our jobs have a lot of keyed state.
>> 
>> If we use snapshots we have old state that may no longer apply to the new 
>> pipeline.
>> If we start a new job we can reprocess historical data from Kafka, but that 
>> can be very resource heavy for a while.
>> 
>> Is there an option I am missing?  Are there facilities to “patch” or “purge” 
>> selectively the keyed state?
>> 
>> Michael
> 
> 



Re: Conceptual question

2018-06-07 Thread Piotr Nowojski
Hi,

Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the 
function and you can not migrate your state that way.

As far as I know yes, at the moment in order to convert everything at once 
(without getKeyes you still can implement lazy conversion) you would have to 
write your own operator.

Piotrek

> On 7 Jun 2018, at 15:26, Tony Wei  wrote:
> 
> Hi Piotrek,
> 
> I used `ProcessFunction` to implement it, but it seems that I can't call 
> `getKeyedStateBackend()` like `WindowOperator` did.
> I found that `getKeyedStateBackend()` is the method in 
> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
> Dose that mean I can't look up all keys and migrate the entire previous 
> states to the new states in `ProcessFunction#open()`?
> As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator` to 
> migration state like the manner showed in `WindowOperator`? 
> 
> Best Regards,
> Tony Wei
> 
> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski  <mailto:pi...@data-artisans.com>>:
> What function are you implementing and how are you using it?
> 
> Usually it’s enough if your function implements RichFunction (or rather 
> extend from AbstractRichFunction) and then you could use RichFunction#open in 
> the similar manner as in the code that I posted in previous message. Flink in 
> many places performs instanceof chekcs like: 
> org.apache.flink.api.common.functions.util.FunctionUtils#openFunction
> 
> public static void openFunction(Function function, Configuration parameters) 
> throws Exception{
>if (function instanceof RichFunction) {
>   RichFunction richFunction = (RichFunction) function;
>   richFunction.open(parameters);
>}
> }
> 
> Piotrek
> 
> 
>> On 7 Jun 2018, at 11:07, Tony Wei > <mailto:tony19920...@gmail.com>> wrote:
>> 
>> Hi Piotrek,
>> 
>> It seems that this was implemented by `Operator` API, which is a more low 
>> level api compared to `Function` API.
>> Since in `Function` API level we can only migrate state by event triggered, 
>> it is more convenient in this way to migrate state by foreach all keys in 
>> `open()` method.
>> If I was implemented state operator by `ProcessFunction` API, is it possible 
>> to port it to `KeyedProcessOperator` and do the state migration that you 
>> mentioned?
>> And are there something concerned and difficulties that will leads to 
>> restored state failed or other problems? Thank you!
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski > <mailto:pi...@data-artisans.com>>:
>> Hi,
>> 
>> General solution for state/schema migration is under development and it 
>> might be released with Flink 1.6.0.
>> 
>> Before that, you need to manually handle the state migration in your 
>> operator’s open method. Lets assume that your OperatorV1 has a state field 
>> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible 
>> with previous version. What you can do, is to add a logic in open method, to 
>> check:
>> 1. If “stateV2” is non empty, do nothing
>> 2. If there is no “stateV2”, iterate over all of the keys and manually 
>> migrate “stateV1” to “stateV2”
>> 
>> In your OperatorV3 you could drop the support for “stateV1”.
>> 
>> I have once implemented something like that here:
>> 
>> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258
>>  
>> <https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258>
>> 
>> Hope that helps!
>> 
>> Piotrek
>> 
>> 
>>> On 6 Jun 2018, at 17:04, TechnoMage >> <mailto:mla...@technomage.com>> wrote:
>>> 
>>> We are still pretty new to Flink and I have a conceptual / DevOps question.
>>> 
>>> When a job is modified and we want to deploy the new version, what is the 
>>> preferred method?  Our jobs have a lot of keyed state.
>>> 
>>> If we use snapshots we have old state that may no longer apply to the new 
>>> pipeline.
>>> If we start a new job we can reprocess historical data from Kafka, but that 
>>> can be very resource heavy for a while.
>>> 
>>> Is there an option I am missing?  Are there facilities to “patch” or 
>>> “purge” selectively the keyed state?
>>> 
>>> Michael
>> 
>> 
> 
> 



Re: Window + Reduce produces more than 1 output per window

2017-06-19 Thread Piotr Nowojski
Hi,

It is difficult for me to respond fully to your question. First of all it would 
be really useful if you could strip down your example to a minimal version that 
shows a problem. Unfortunately I was unable to reproduce your issue. I was 
getting only one output line per window (as expected). Could you try to print 
output to the console (or use some different data sink) instead of writing it 
back to the Kafka, maybe there is a problem? Also please try remove some parts 
of the code bit by bit, so that you may be able to find what’s causing a 
problem.

As a side note I have couple of concerns with your 
timestamps/watermarks/windows definitions. First you specify time 
characteristic to an EventTime:

> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

But I don’t see where you are actually setting the timestamp/watermarks. Didn’t 
you want to use “.assignTimestampsAndWatermarks(…)” on your input DataStream 
based on it’s content? Nevertheless, later you specify window by ProcessingTime:

>   .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)));

Which defines the windows independent of the content of those events. Maybe 
switching to properly EvenTime will solve your problem?

Thanks, Piotrek

> On Jun 18, 2017, at 6:12 PM, FRANCISCO BORJA ROBLES MARTIN 
>  wrote:
> 
> Hello everybody! First of all, thanks for reading :D
> 
> I am currently working on my bachelor's final project which is a comparison 
> between Spark Streaming and Flink. Now let's focus on the problem:
> 
> - THE PROBLEM: the problem is that my program is writing to Kafka more than 
> once every window (is creating 2-3 or more lines per window, meanwhile it is 
> supposed to create 1 line per window as with the reduce function it lets only 
> one element). I have the same code written in Spark and it works perfectly. I 
> have been trying to find info about this issue and I haven't found anything 
> :(. Also I have been trying changing some functions' parallelism and some 
> more things and nothing worked, and I can not realise where can be the 
> problem.
> 
> - MY CLUSTER: I am using Flink 1.2.0 and OpenJDK 8. I have 3 computers: 1 
> JobManager, 2 TaskManagers (4 cores, 2GB RAM, 4 task slots each TaskManager).
> 
> - INPUT DATA: lines produced by one java producer to the Kafka 24 partitions' 
> topic with two elements: incremental value and creation timestamp:
> 1 1497790546981
> 2 1497790546982
> 3 1497790546983
> 4 1497790546984
> ..
> 
> - MY JAVA APPLICATION:
> + It reads from a Kafka topic with 24 partitions (Kafka is in the same 
> machine than the JobManager).
> + The filter functions are useless together with the union as I use them just 
> for checking their latency.
> + Basically, it adds a "1" to each line,then there is a tumbling window every 
> 2 seconds, and the  reduce function sum all this 1's and all the timestamps, 
> this last timestamp is later divided in the map function between the sum of 
> 1's which gives me the average, and finally in the last map function it adds 
> a timestamp of the current moment to each reduced line and the difference 
> between this timestamp and the average timestamp.
> + This line is written to Kafka (to a 2 partitions' topic).
> 
> # - CODE - 
> 
>//FLINK CONFIGURATION
>final StreamExecutionEnvironment env = StreamExecutionEnvironment
>.getExecutionEnvironment();
> 
>env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>//env.setParallelism(2);
> 
>//KAFKA CONSUMER CONFIGURATION
>Properties properties = new Properties();
>properties.setProperty("bootstrap.servers", "192.168.0.155:9092");
>FlinkKafkaConsumer010 myConsumer = new 
> FlinkKafkaConsumer010<>(args[0], new SimpleStringSchema(), properties);
> 
> 
>//KAFKA PRODUCER
>Properties producerConfig = new Properties();
>producerConfig.setProperty("bootstrap.servers", "192.168.0.155:9092");
>producerConfig.setProperty("acks", "0");
>producerConfig.setProperty("linger.ms", "0");
> 
> 
>//MAIN PROGRAM
>//Read from Kafka
>DataStream line = env.addSource(myConsumer);
> 
>//Add 1 to each line
>DataStream> line_Num = line.map(new NumberAdder());
> 
>//Filted Odd numbers
>DataStream> line_Num_Odd = line_Num.filter(new 
> FilterOdd());
> 
>//Filter Even numbers
>DataStream> line_Num_Even = line_Num.filter(new 
> FilterEven());
> 
>//Join Even and Odd
>DataStream> line_Num_U = 
> line_Num_Odd.union(line_Num_Even);
> 
>//Tumbling windows every 2 seconds
>AllWindowedStream, TimeWindow> windowedLine_Num_U 
> = line_Num_U
>.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)));
> 
>//Reduce to one line with the sum
>DataStream> wL_Num_U_Reduced = 
> windowedLine_Num_U.reduce(new Reducer());
> 
>//Calculate the average of the elements summed
>DataStream wL_Average = wL_Num_U_Reduc

Re: Window + Reduce produces more than 1 output per window

2017-06-19 Thread Piotr Nowojski
One more thing, please try to minimize your solution by removing this Union and 
Odd/Even filters at the beginning and check whether you get the same results.

Piotrek

> On Jun 19, 2017, at 2:43 PM, Piotr Nowojski  wrote:
> 
> Hi,
> 
> It is difficult for me to respond fully to your question. First of all it 
> would be really useful if you could strip down your example to a minimal 
> version that shows a problem. Unfortunately I was unable to reproduce your 
> issue. I was getting only one output line per window (as expected). Could you 
> try to print output to the console (or use some different data sink) instead 
> of writing it back to the Kafka, maybe there is a problem? Also please try 
> remove some parts of the code bit by bit, so that you may be able to find 
> what’s causing a problem.
> 
> As a side note I have couple of concerns with your 
> timestamps/watermarks/windows definitions. First you specify time 
> characteristic to an EventTime:
> 
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> 
> But I don’t see where you are actually setting the timestamp/watermarks. 
> Didn’t you want to use “.assignTimestampsAndWatermarks(…)” on your input 
> DataStream based on it’s content? Nevertheless, later you specify window by 
> ProcessingTime:
> 
>>  .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)));
> 
> Which defines the windows independent of the content of those events. Maybe 
> switching to properly EvenTime will solve your problem?
> 
> Thanks, Piotrek
> 
>> On Jun 18, 2017, at 6:12 PM, FRANCISCO BORJA ROBLES MARTIN 
>>  wrote:
>> 
>> Hello everybody! First of all, thanks for reading :D
>> 
>> I am currently working on my bachelor's final project which is a comparison 
>> between Spark Streaming and Flink. Now let's focus on the problem:
>> 
>> - THE PROBLEM: the problem is that my program is writing to Kafka more than 
>> once every window (is creating 2-3 or more lines per window, meanwhile it is 
>> supposed to create 1 line per window as with the reduce function it lets 
>> only one element). I have the same code written in Spark and it works 
>> perfectly. I have been trying to find info about this issue and I haven't 
>> found anything :(. Also I have been trying changing some functions' 
>> parallelism and some more things and nothing worked, and I can not realise 
>> where can be the problem.
>> 
>> - MY CLUSTER: I am using Flink 1.2.0 and OpenJDK 8. I have 3 computers: 1 
>> JobManager, 2 TaskManagers (4 cores, 2GB RAM, 4 task slots each TaskManager).
>> 
>> - INPUT DATA: lines produced by one java producer to the Kafka 24 
>> partitions' topic with two elements: incremental value and creation 
>> timestamp:
>> 1 1497790546981
>> 2 1497790546982
>> 3 1497790546983
>> 4 1497790546984
>> ..
>> 
>> - MY JAVA APPLICATION:
>> + It reads from a Kafka topic with 24 partitions (Kafka is in the same 
>> machine than the JobManager).
>> + The filter functions are useless together with the union as I use them 
>> just for checking their latency.
>> + Basically, it adds a "1" to each line,then there is a tumbling window 
>> every 2 seconds, and the  reduce function sum all this 1's and all the 
>> timestamps, this last timestamp is later divided in the map function between 
>> the sum of 1's which gives me the average, and finally in the last map 
>> function it adds a timestamp of the current moment to each reduced line and 
>> the difference between this timestamp and the average timestamp.
>> + This line is written to Kafka (to a 2 partitions' topic).
>> 
>> # - CODE - 
>> 
>>   //FLINK CONFIGURATION
>>   final StreamExecutionEnvironment env = StreamExecutionEnvironment
>>   .getExecutionEnvironment();
>> 
>>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>   //env.setParallelism(2);
>> 
>>   //KAFKA CONSUMER CONFIGURATION
>>   Properties properties = new Properties();
>>   properties.setProperty("bootstrap.servers", "192.168.0.155:9092");
>>   FlinkKafkaConsumer010 myConsumer = new 
>> FlinkKafkaConsumer010<>(args[0], new SimpleStringSchema(), properties);
>> 
>> 
>>   //KAFKA PRODUCER
>>   Properties producerConfig = new Properties();
>>   producerConfig.setProperty("bootstrap.servers", "192.168.0.155:9092");
>>   producerConfig.setProperty("acks", "0");
>>   producer

Re: Window + Reduce produces more than 1 output per window

2017-06-19 Thread Piotr Nowojski
No problem. Make sure that your application didn’t run in the background three 
times, thus producing 3x the expected output.

Piotrek

> On Jun 19, 2017, at 5:25 PM, FRANCISCO BORJA ROBLES MARTIN 
>  wrote:
> 
> Hello Piotrek!
> 
> Thanks for answering! Yes I have already changed the "TimeCharacteristic" to 
> "ProcessingTime". I need it for the ".setWriteTimestampToKafka(true)" option 
> as I use the timestamp in the Kafka consumer who reads this app's output. I 
> have already changed the code a bit for using KeyedStreams and be able to use 
> parallelism in the window/reduce functions.
> 
> About the problem, yesterday I noticed that the problem was growing as I did 
> more submits, it was doing x3 outputs (with small differences in each input 
> as you can see in my first message), but before it was doing x2 only. Finally 
> I stopped the cluster (stop-cluster.sh) and started it again 
> (start-cluster.sh) and the problem was solved. I have been trying to repeat 
> the problem submitting the app several times but I haven't achieved it today. 
> If it happens again I will try to repeat the problem with the smaller code as 
> possible to try to find where could be the possible bug (it seems to be 
> something wrong when submitting several times).
> 
> Kind regards!
> Fran.
> 
> 
> El 2017-06-19 14:43, Piotr Nowojski escribió:
>> Hi,
>> It is difficult for me to respond fully to your question. First of all
>> it would be really useful if you could strip down your example to a
>> minimal version that shows a problem. Unfortunately I was unable to
>> reproduce your issue. I was getting only one output line per window
>> (as expected). Could you try to print output to the console (or use
>> some different data sink) instead of writing it back to the Kafka,
>> maybe there is a problem? Also please try remove some parts of the
>> code bit by bit, so that you may be able to find what’s causing a
>> problem.
>> As a side note I have couple of concerns with your
>> timestamps/watermarks/windows definitions. First you specify time
>> characteristic to an EventTime:
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> But I don’t see where you are actually setting the
>> timestamp/watermarks. Didn’t you want to use
>> “.assignTimestampsAndWatermarks(…)” on your input DataStream
>> based on it’s content? Nevertheless, later you specify window by
>> ProcessingTime:
>>>  .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)));
>> Which defines the windows independent of the content of those events.
>> Maybe switching to properly EvenTime will solve your problem?
>> Thanks, Piotrek



Re: Fink: KafkaProducer Data Loss

2017-07-13 Thread Piotr Nowojski
Hi,

I’m not sure how relevant is this, but recently I have found and fixed a bug, 
that in certain conditions was causing data losses for all of the 
FlinkKafkaProducers in Flink:

https://issues.apache.org/jira/browse/FLINK-6996 


Namely on checkpoint “flush” method was not being called. It should be fixed in 
Flink 1.3.2 and 1.4 releases.

Piotrek

> On Jul 12, 2017, at 7:32 PM, ninad  wrote:
> 
> Hey guys, any update on this? If needed I can attach our code.
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p14224.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Fink: KafkaProducer Data Loss

2017-07-13 Thread Piotr Nowojski
Ops, sorry, I forgot that this issue was relevant to FlinkKafkaProducer010 only.

Piotrek

> On Jul 13, 2017, at 9:33 AM, Tzu-Li (Gordon) Tai  wrote:
> 
> Hi Ninad & Piotr,
> 
> AFAIK, when this issue was reported, Ninad was using 09.
> FLINK-6996 only affects Flink Kafka Producer 010, so I don’t think that’s the 
> cause here.
> 
> @Ninad
> Code to reproduce this would definitely be helpful here, thanks. If you 
> prefer to provide that privately, that would also be fine.
> 
> Cheers,
> Gordon
> 
> On 13 July 2017 at 4:13:07 PM, Piotr Nowojski (pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>) wrote:
> 
>> Hi,
>> 
>> I’m not sure how relevant is this, but recently I have found and fixed a 
>> bug, that in certain conditions was causing data losses for all of the 
>> FlinkKafkaProducers in Flink:
>> 
>> https://issues.apache.org/jira/browse/FLINK-6996 
>> <https://issues.apache.org/jira/browse/FLINK-6996>
>> 
>> Namely on checkpoint “flush” method was not being called. It should be fixed 
>> in Flink 1.3.2 and 1.4 releases.
>> 
>> Piotrek
>> 
>>> On Jul 12, 2017, at 7:32 PM, ninad >> <mailto:nni...@gmail.com>> wrote:
>>> 
>>> Hey guys, any update on this? If needed I can attach our code.
>>> 
>>> 
>>> 
>>> --
>>> View this message in context: 
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p14224.html
>>>  
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p14224.html>
>>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>>> at Nabble.com <http://nabble.com/>.



Re: About KafkaConsumerBase

2017-08-01 Thread Piotr Nowojski
Hi,

pendingOffsetsToCommit is a private field which is not accessed from outside of 
the FlinkKafkaConsumerBase class. It is only used in state manipulation 
methods, which are not executed in parallel.

Thanks, Piotrek


> On Aug 1, 2017, at 1:20 PM, aitozi  wrote:
> 
> Hello:
> 
>i am new to Flink, ijust read the source code . i am doubt that , why in
> FlinkKafkaConsumerBase.java (version1.2),  like method :
> notifyCheckpointComplete  may change the pendingOffsetsToCommit in parallel
> , why dont need to be surrouned with "synchronized"  
> 
> thanks 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/About-KafkaConsumerBase-tp14601.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Kafka 0.11

2017-08-18 Thread Piotr Nowojski
Hi,

Yes, Flink Connector for Kafka 0.10 should work without problems with Kafka 
0.11.

There is also a pending work for a Kafka 0.11 connector that will add support 
for exactly-once semantic.

Piotrek   

> On Aug 18, 2017, at 5:21 PM, Gabriele Di Bernardo 
>  wrote:
> 
> Hi guys, 
> 
> Is the Flink Connector Kafka 0.10 fully compatible with Kafka 0.11?
> 
> Thank you in advance.
> 
> Best,
> 
> 
> Gabriele



Re: Modify field topics (KafkaConsumer) during runtime

2017-08-31 Thread Piotr Nowojski
Hi,

As far as I know it is not possible to do it on the fly. There is planned 
feature for discovering topics using some regex:
https://github.com/apache/flink/pull/3746 

https://issues.apache.org/jira/browse/FLINK-5704 

But it is not completed yet.

Thanks, Piotrek

> On Aug 30, 2017, at 3:31 PM, Jose Miguel Tejedor Fernandez 
>  wrote:
> 
> Hi,
> 
> I am using Flink version 1.3.1.
> 
> I am wondering if it is possible to add/delete new topics to 
> FlinkKafkaConsumer during execution of a job? Otherwise, I guess I need to 
> cancel the job and redeploy the new job.
> Cheers
> 
> BR



Re: Very low-latency - is it possible?

2017-08-31 Thread Piotr Nowojski
Achieving 1ms in any distributed system might be problematic, because even 
simplest ping messages between worker nodes take ~0.2ms.

However, as you stated your desired throughput (40k records/s) and state is 
small, so maybe there is no need for using a distributed system for that? You 
could try run single node Flink instance (or 2 node instance with parallelism 
set to 1, just for automatic failures recovery). 

As Jörn wrote earlier it might be just simpler to write simple custom java 
standalone application for that. As long as your state fits into memory of a 
single node, you should be easily able to process millions of records per 
second on a single machine. 

Piotrek

> On Aug 31, 2017, at 3:01 PM, Jörn Franke  wrote:
> 
> If you really need to get that low something else might be more suitable. 
> Given the times a custom solution might be necessary. Flink is a generic 
> powerful framework - hence it does not address these latencies. 
> 
>> On 31. Aug 2017, at 14:50, Marchant, Hayden  wrote:
>> 
>> We're about to get started on a 9-person-month PoC using Flink Streaming. 
>> Before we get started, I am interested to know how low-latency I can expect 
>> for my end-to-end flow for a single event (from source to sink). 
>> 
>> Here is a very high-level description of our Flink design: 
>> We need at least once semantics, and our main flow of application is parsing 
>> a message ( < 50 microseconds) from Kafka, and then doing a keyBy on the 
>> parsed event ( <1kb) and then updating a very small user state in the 
>> KeyedStream, and then doing another keyBy and then operator of that 
>> KeyedStream. Each of the operators is a very simple operation - very little 
>> calculation and no I/O.
>> 
>> 
>> ** Our requirement is to get close to 1ms (99%) or lower for end-to-end 
>> processing (timer starts once we get message from Kafka). Is this at all 
>> realistic if are flow contains 2 aggregations?  If so, what optimizations 
>> might we need to get there regarding cluster configuration (both Flink and 
>> Hardware). Our throughput is possibly small enough (40,000 events per 
>> second) that we could run on one node - which might eliminate some network 
>> latency. 
>> 
>> I did read in 
>> https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html
>>  in Exactly Once vs At Least Once that a few milliseconds is considered 
>> super low-latency - wondering if we can get lower.
>> 
>> Any advice or 'war stories' are very welcome.
>> 
>> Thanks,
>> Hayden Marchant
>> 
>> 



Re: Bucketing/Rolling Sink: New timestamp appeded to the part file name everytime a new part file is rolled

2017-08-31 Thread Piotr Nowojski
Hi,

BucketingSink doesn’t support the feature that you are requesting, you can not 
specify a dynamically generated prefix/suffix.

Piotrek

> On Aug 31, 2017, at 7:12 PM, Raja.Aravapalli  
> wrote:
> 
>  
> Hi,
>  
> I have a flink application that is streaming data into HDFS and I am using 
> Bucketing Sink for that. And, I want to know if is it possible to rename the 
> part files that is being created in the base hdfs directory.
>  
> Right now I am using the below code for including the timestamp into 
> part-file name, but the problem I am facing is the timestamp is not changing 
> for the new part file that is being rolled!
>  
>  
> BucketingSink HdfsSink = new BucketingSink (hdfsOutputPath);
> 
> HdfsSink.setBucketer(new BasePathBucketer());
> HdfsSink.setBatchSize(1024 * 1024 * hdfsOutputBatchSizeInMB); // this means 
> 'hdfsOutputBatchSizeInMB' MB
> HdfsSink.setPartPrefix("PART-FILE-" + 
> Long.toString(System.currentTimeMillis()));
>  
>  
> Can someone please suggest me, what code changes I can try so that I get a 
> new timestamp for every part file that is being rolled new?
>  
>  
> Thanks a lot. 
>  
> Regards,
> Raja.



Re: How flink monitor source stream task(Time Trigger) is running?

2017-09-29 Thread Piotr Nowojski
We use Akka's DeathWatch mechanism to detect dead components.

TaskManager failure shouldn’t prevent recovering from state (as long as there 
are enough task slots).

I’m not sure if I understand what you mean by "source stream thread" crash. If 
is was some error during performing a checkpoint so that it didn’t complete, 
Flink will not be able to recover from such incomplete checkpoint.

Could you share us the logs with your issue?

Thanks, Piotrek

> On Sep 29, 2017, at 7:30 AM, yunfan123  wrote:
> 
> In my understanding, flink just use task heartbeat to monitor taskManager is
> running.
> If source stream (Time Trigger for XXX)thread is crash, it seems flink can't
> recovery from this state?
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



<    1   2   3   4   5   6   7   >