Re: How to share text file across tasks at run time in flink.

2016-08-23 Thread Jark Wu
Hi,

I think what Bswaraj want is excatly something like Storm Distributed Cache 
API[1] (if I’m not misunderstanding). 

> The distributed cache feature in storm is used to efficiently distribute 
> files (or blobs, which is the equivalent terminology for a file in the 
> distributed cache and is used interchangeably in this document) that are 
> large and can change during the lifetime of a topology, such as geo-location 
> data, dictionaries, etc. Typical use cases include phrase recognition, entity 
> extraction, document classification, URL re-writing, location/address 
> detection and so forth. Such files may be several KB to several GB in size. 
> For small datasets that don't need dynamic updates, including them in the 
> topology jar could be fine. But for large files, the startup times could 
> become very large. In these cases, the distributed cache feature can provide 
> fast topology startup, especially if the files were previously downloaded for 
> the same submitter and are still in the cache. This is useful with frequent 
> deployments, sometimes few times a day with updated jars, because the large 
> cached files will remain available without changes. The large cached blobs 
> that do not change frequently will remain available in the distributed cache.

We can look into this whether it is a common use case and how to implement it 
in Flink. 

[1] http://storm.apache.org/releases/2.0.0-SNAPSHOT/distcache-blobstore.html 

 

- Jark Wu 

> 在 2016年8月23日,下午9:45,Lohith Samaga M  写道:
> 
> Hi
> May be you could use Cassandra to store and fetch all such reference data.  
> This way the reference data can be updated without restarting your 
> application.
> 
> Lohith
> 
> Sent from my Sony Xperia™ smartphone
> 
> 
> 
>  Baswaraj Kasture wrote 
> 
> Thanks Kostas !
> I am using DataStream API.
> 
> I have few config/property files (key vale text file) and also have business 
> rule files (json).
> These rules and configurations are needed when we process incoming event.
> Is there any way to share them to task nodes from driver program ?
> I think this is very common use case and am sure other users may face similar 
> issues.
> 
> +Baswaraj
> 
> On Mon, Aug 22, 2016 at 4:56 PM, Kostas Kloudas  > wrote:
> Hello Baswaraj,
> 
> Are you using the DataSet (batch) or the DataStream API?
> 
> If you are in the first, you can use a broadcast variable 
> 
>  for your task.
> If you are using the DataStream one, then there is no proper support for that.
> 
> Thanks,
> Kostas
> 
>> On Aug 20, 2016, at 12:33 PM, Baswaraj Kasture > > wrote:
>> 
>> Am running Flink standalone cluster.
>> 
>> I have text file that need to be shared across tasks when i submit my 
>> application.
>> in other words , put this text file in class path of running tasks.
>> 
>> How can we achieve this with flink ?
>> 
>> In spark, spark-submit has --jars option that puts all the files specified 
>> in class path of executors (executors run in separate JVM and spawned 
>> dynamically, so it is possible).
>> 
>> Flink's task managers run tasks in separate thread under taskmanager JVM (?) 
>> , how can we make this text file to be accessible on all tasks spawned by 
>> current application ?
>> 
>> Using HDFS, NFS or including file in program jar is one way that i know, but 
>> am looking for solution that can allows me to provide text file at run time 
>> and still accessible in all tasks.
>> Thanks.
> 
> 
> 
> Information transmitted by this e-mail is proprietary to Mphasis, its 
> associated companies and/ or its customers and is intended 
> for use only by the individual or entity to which it is addressed, and may 
> contain information that is privileged, confidential or 
> exempt from disclosure under applicable law. If you are not the intended 
> recipient or it appears that this mail has been forwarded 
> to you without proper authority, you are notified that any use or 
> dissemination of this information in any manner is strictly 
> prohibited. In such cases, please notify us immediately at 
> mailmas...@mphasis.com and delete this mail from your records.
> 



Dealing with Multiple sinks in Flink

2016-08-23 Thread Vinay Patil
Hi,

In our flink pipeline we are currently writing the data to multiple S3
objects/folders based on some conditions, so the issue I am facing is as
follows :

Consider these S3 folders :
temp_bucket/processedData/20160823/
temp_bucket/rawData/20160822/
temp_bucket/errorData/20160821/

Now when the parallelism is set to 1, the data gets written to all S3
folders above, but when I set it to larger value the data is written only
to the first folder and not the others.

I am testing the flink job on EMR with 4 task managers having 16 slots,
even if I keep parallelism as 4 , I am facing the same issue.
(running from IDE is resulting in same output, Tested this with Flink 1.0.3
and 1.1.1)

I am not understanding why this is happening.


Regards,
Vinay Patil


How to get latency info from benchmark

2016-08-23 Thread Eric Fukuda
Hi,

I'm trying to benchmark Flink without Kafka as mentioned in this post (
http://data-artisans.com/extending-the-yahoo-streaming-benchmark/). After
running flink.benchmark.state.AdvertisingTopologyFlinkState with
user.local.event.generator in localConf.yaml set to 1, I ran
flink.benchmark.utils.AnalyzeTool giving flink-1.0.1/log/flink-[
username]-jobmanager-0-[servername].log as a command-line argument. I got
the following output and it does not have the information about the latency.


= Latency (0 reports ) =
= Throughput (1 reports ) =
== null (entries: 10150)===
Mean throughput 639078.5018497099
Exception in thread "main" java.lang.IndexOutOfBoundsException: toIndex = 2
at java.util.ArrayList.subListRangeCheck(ArrayList.java:962)
at java.util.ArrayList.subList(ArrayList.java:954)
at flink.benchmark.utils.AnalyzeTool.main(AnalyzeTool.java:133)


Reading the code in AnalyzeTool.java, I found that it's looking for lines
that include "Latency" in the log file, but apparently it's not finding
any. I tried grepping the log file, and couldn't find any either. I have
one server that runs both JobManager and Task Manager and another server
that runs Redis, and they are connected through a network with each other.

I think I have to do something to read the data stored in Redis before
running AnalyzeTool, but can't figure out what. Does anyone know how to get
the latency information?

Thanks,
Eric


Re: WordCount w/ YARN and EMR local filesystem and/or HDFS

2016-08-23 Thread Robert Metzger
Hi,

the problem is that you are using the wrong Namenode port. The port
is 8020, not 50070.
On EMR, you actually don't need to specify the Namenode port at all.

This command works for me:

[hadoop@ip-172-31-23-104 ~]$ flink run -m yarn-cluster -yn 2
/usr/lib/flink/examples/streaming/WordCount.jar --input
hdfs:///user/hadoop/test10Mb.db --output hdfs:///user/hadoop/out

I hope that does the trick.

Let me know if you need further help.

Regards,
Robert


On Tue, Aug 23, 2016 at 8:57 PM, Foster, Craig  wrote:

> I am 99% they are the same since this exists in the EMR yum repo on the
> cluster.
>
>
>
>
>
>
>
> *From: *Stephan Ewen 
> *Reply-To: *"user@flink.apache.org" 
> *Date: *Tuesday, August 23, 2016 at 11:47 AM
> *To: *"user@flink.apache.org" , Robert Metzger <
> rmetz...@data-artisans.com>
>
> *Subject: *Re: WordCount w/ YARN and EMR local filesystem and/or HDFS
>
>
>
> I would have to pull in Robert into the loop, but my first guess is that
> this is a Hadoop version mismatch.
>
>
>
> Can you double check that the Hadoop Version for which you download Flink
> is the same as the one on the cluster?
>
>
>
> Greetings,
>
> Stephan
>
>
>
>
>
> On Tue, Aug 23, 2016 at 8:44 PM, Foster, Craig 
> wrote:
>
> Ah cool, that makes sense then. So you must always stage your files in a
> HDFS, S3, etc. I can live with that ;)
>
>
>
> Then here is my HDFS exception...maybe I've got to configure something
> differently? I think I am using a proper HDFS URI.
>
> For HDFS, I tried this but got a protobuf exception:
>
> % hadoop fs -ls /user/hadoop/
>
> Found 2 items
>
> drwxrwxrwx   - hadoop hadoop  0 2016-08-23 17:49
> /user/hadoop/.flink
>
> -rw-r--r--   1 hadoop hadoop  15419 2016-08-23 14:56
> /user/hadoop/LICENSE.txt
>
>
>
> % flink run -m yarn-cluster -yn 2 
> /usr/lib/flink/examples/streaming/WordCount.jar
> --input hdfs://ec2-54-208-188-194.compute-1.amazonaws.com:50070/
> user/hadoop/LICENSE.txt
>
>
>
> 
>
> 
>
> The program finished with the following exception:
>
>
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Failed to submit job ed49701a40c805d3b6e874568170fc74
> (Streaming WordCount)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>
> at org.apache.flink.streaming.api.environment.
> StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
>
> at org.apache.flink.streaming.examples.wordcount.WordCount.
> main(WordCount.java:94)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:505)
>
> at org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:403)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>
> at org.apache.flink.client.CliFrontend.executeProgramBlocking(
> CliFrontend.java:866)
>
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>
> at org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:1192)
>
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
> to submit job ed49701a40c805d3b6e874568170fc74 (Streaming WordCount)
>
> at org.apache.flink.runtime.jobmanager.JobManager.org
> $
> apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:
> 1120)
>
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
>
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
>
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
>
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
>
> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.
> applyOrElse(YarnJobManager.scala:153)
>
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>
> at org.apache.flink.runtime.LeaderSessionMessageFilter$$
> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
>
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> 

Re: sharded state, 2-step operation

2016-08-23 Thread Michael Warnock
Another approach I'm considering, which feels pretty kludgy, but I think
should be acceptable for my current use:

Only one stateful op, keyed on the same field, but with a flag field
indicating the actual operation to be performed.  The results of this op
are output to a kafka (or whatever) queue, which is ingested along with the
first stream.  The two state changes don't have to be atomic for my case,
but the second one does have to be guaranteed to eventually happen, and be
idempotent.  I'm not quite sure how to (safely) make that second pass
idempotent though, at the moment, and I'm not sure if there might be other
issues with it I'm not seeing - it definitely doesn't _feel_ like a great
solution.

Any thoughts?

On Tue, Aug 23, 2016 at 11:53 AM, Michael Warnock 
wrote:

> Thanks for the quick response!
>
> I've been wondering about Connected streams and CoFlatMap, but either I
> don't see all the ways they can be used, or they don't solve my problem.
> Do you know of any examples outside of the documentation?  My searches for
> "flink comap example" and similar haven't turned anything up.
>
> On Tue, Aug 23, 2016 at 11:41 AM, Stephan Ewen  wrote:
>
>> Hi!
>>
>> This is a tricky one. State access and changes are not shared across
>> operators in Flink.
>> We chose that design because it makes it possible to work on "local"
>> state in each operator
>>   - state automatically shards with the computation
>>   - no locking / concurrency implications
>>   - asynchronous persistence
>>
>> Sharing state across operations between two operations in the same stage
>> works with the CoMap / CoFlatMap functions
>> Sharing state across successive nodes does not work, because the
>> functions could be executed on different machines and one would need to do
>> remote and synchronized state updates that way.
>>
>> Do you think you can use the CoMap / CoFlatMap functions for this?
>>
>> Greetings,
>> Stephan
>>
>>
>> On Tue, Aug 23, 2016 at 8:03 PM, Michael Warnock 
>> wrote:
>>
>>> I'm trying to do something that seems like it should be possible, but my
>>> implementation doesn't behave as expected, and I'm not sure how else to
>>> express it.
>>>
>>> Let's say the stream is composed of tuples like this: (Alice, Bob, 1)
>>> and I want to keyBy(1), flatMap with state associated with Alice, then
>>> keyBy(2) with state associated with Bob.  The trick is, when I later get a
>>> tuple like (Bob, Alice, 1), I want the first operator to see the state that
>>> was updated in the second op previously.  Is this possible?  I tried
>>>  implementing both operators as one, getting the state by descriptor in the
>>> flatMap body, and even instantiating the operator only once; the behavior
>>> is, as you might guess, that the state in stage 1 doesn't include changes
>>> made previously in stage 2.
>>>
>>> Is there any way to do this without throwing away the parallelism?
>>>
>>> Thanks in advance!
>>> ~Michael
>>>
>>
>>
>


Re: sharded state, 2-step operation

2016-08-23 Thread Michael Warnock
Thanks for the quick response!

I've been wondering about Connected streams and CoFlatMap, but either I
don't see all the ways they can be used, or they don't solve my problem.
Do you know of any examples outside of the documentation?  My searches for
"flink comap example" and similar haven't turned anything up.

On Tue, Aug 23, 2016 at 11:41 AM, Stephan Ewen  wrote:

> Hi!
>
> This is a tricky one. State access and changes are not shared across
> operators in Flink.
> We chose that design because it makes it possible to work on "local" state
> in each operator
>   - state automatically shards with the computation
>   - no locking / concurrency implications
>   - asynchronous persistence
>
> Sharing state across operations between two operations in the same stage
> works with the CoMap / CoFlatMap functions
> Sharing state across successive nodes does not work, because the functions
> could be executed on different machines and one would need to do remote and
> synchronized state updates that way.
>
> Do you think you can use the CoMap / CoFlatMap functions for this?
>
> Greetings,
> Stephan
>
>
> On Tue, Aug 23, 2016 at 8:03 PM, Michael Warnock 
> wrote:
>
>> I'm trying to do something that seems like it should be possible, but my
>> implementation doesn't behave as expected, and I'm not sure how else to
>> express it.
>>
>> Let's say the stream is composed of tuples like this: (Alice, Bob, 1) and
>> I want to keyBy(1), flatMap with state associated with Alice, then keyBy(2)
>> with state associated with Bob.  The trick is, when I later get a tuple
>> like (Bob, Alice, 1), I want the first operator to see the state that was
>> updated in the second op previously.  Is this possible?  I tried
>>  implementing both operators as one, getting the state by descriptor in the
>> flatMap body, and even instantiating the operator only once; the behavior
>> is, as you might guess, that the state in stage 1 doesn't include changes
>> made previously in stage 2.
>>
>> Is there any way to do this without throwing away the parallelism?
>>
>> Thanks in advance!
>> ~Michael
>>
>
>


Re: WordCount w/ YARN and EMR local filesystem and/or HDFS

2016-08-23 Thread Stephan Ewen
I would have to pull in Robert into the loop, but my first guess is that
this is a Hadoop version mismatch.

Can you double check that the Hadoop Version for which you download Flink
is the same as the one on the cluster?

Greetings,
Stephan


On Tue, Aug 23, 2016 at 8:44 PM, Foster, Craig  wrote:

> Ah cool, that makes sense then. So you must always stage your files in a
> HDFS, S3, etc. I can live with that ;)
>
>
>
> Then here is my HDFS exception...maybe I've got to configure something
> differently? I think I am using a proper HDFS URI.
>
> For HDFS, I tried this but got a protobuf exception:
>
> % hadoop fs -ls /user/hadoop/
>
> Found 2 items
>
> drwxrwxrwx   - hadoop hadoop  0 2016-08-23 17:49
> /user/hadoop/.flink
>
> -rw-r--r--   1 hadoop hadoop  15419 2016-08-23 14:56
> /user/hadoop/LICENSE.txt
>
>
>
> % flink run -m yarn-cluster -yn 2 
> /usr/lib/flink/examples/streaming/WordCount.jar
> --input hdfs://ec2-54-208-188-194.compute-1.amazonaws.com:50070/
> user/hadoop/LICENSE.txt
>
>
>
> 
>
> 
>
> The program finished with the following exception:
>
>
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Failed to submit job ed49701a40c805d3b6e874568170fc74
> (Streaming WordCount)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>
> at org.apache.flink.streaming.api.environment.
> StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
>
> at org.apache.flink.streaming.examples.wordcount.WordCount.
> main(WordCount.java:94)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:505)
>
> at org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:403)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>
> at org.apache.flink.client.CliFrontend.executeProgramBlocking(
> CliFrontend.java:866)
>
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>
> at org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:1192)
>
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
> to submit job ed49701a40c805d3b6e874568170fc74 (Streaming WordCount)
>
> at org.apache.flink.runtime.jobmanager.JobManager.org
> $
> apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:
> 1120)
>
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
>
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
>
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
>
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
>
> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.
> applyOrElse(YarnJobManager.scala:153)
>
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>
> at org.apache.flink.runtime.LeaderSessionMessageFilter$$
> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
>
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
>
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
>
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:33)
>
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:28)
>
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>
> at org.apache.flink.runtime.LogMessages$$anon$1.
> applyOrElse(LogMessages.scala:28)
>
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>
> at org.apache.flink.runtime.jobmanager.JobManager.
> aroundReceive(JobManager.scala:107)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
>
>

Re: JobManager HA without Distributed FileSystem

2016-08-23 Thread Stephan Ewen
Hi!

The state one can store in ZooKeeper is only very small (recommended is
smaller than 1MB per handle).

For HA, the JobManager needs to persist:
  - JobGraph
  - JAR files
  - Checkpoint Metadata

Those are easily too large for ZooKeeper, which is why Flink currently
requires a DFS to store those, and only stores "pointers" to the data in
the DFS in ZooKeeper.

Are you thinking of another highly available storage for larger data
(megabytes) that could be used here?

Greetings,
Stephan


On Tue, Aug 23, 2016 at 6:36 PM, Konstantin Knauf <
konstantin.kn...@tngtech.com> wrote:

> Hi all,
>
> the documenation of JobManager HA [1] explains that HA is only possible
> with the FS state backend as Job Manager metadata is saved there.
>
> What are the particular problems using JobManager HA with the
> MemoryStatebackend?
>
> As I understand it, the state is checkpointed to all JobManagers
> (leaders + standy) when using the MemoryStateBackend or am I wrong here?
>
> Follow Up Question: Is it generally possible to setup a highly
> available, at-least-once (source: Kafka) pipeline without a distributed
> filesystem (only local FS and Zookeeper) for the checkpoints?
>
> Cheers,
>
> Konstantin
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-
> master/setup/jobmanager_high_availability.html
>
> --
> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>


Re: sharded state, 2-step operation

2016-08-23 Thread Stephan Ewen
Hi!

This is a tricky one. State access and changes are not shared across
operators in Flink.
We chose that design because it makes it possible to work on "local" state
in each operator
  - state automatically shards with the computation
  - no locking / concurrency implications
  - asynchronous persistence

Sharing state across operations between two operations in the same stage
works with the CoMap / CoFlatMap functions
Sharing state across successive nodes does not work, because the functions
could be executed on different machines and one would need to do remote and
synchronized state updates that way.

Do you think you can use the CoMap / CoFlatMap functions for this?

Greetings,
Stephan


On Tue, Aug 23, 2016 at 8:03 PM, Michael Warnock  wrote:

> I'm trying to do something that seems like it should be possible, but my
> implementation doesn't behave as expected, and I'm not sure how else to
> express it.
>
> Let's say the stream is composed of tuples like this: (Alice, Bob, 1) and
> I want to keyBy(1), flatMap with state associated with Alice, then keyBy(2)
> with state associated with Bob.  The trick is, when I later get a tuple
> like (Bob, Alice, 1), I want the first operator to see the state that was
> updated in the second op previously.  Is this possible?  I tried
>  implementing both operators as one, getting the state by descriptor in the
> flatMap body, and even instantiating the operator only once; the behavior
> is, as you might guess, that the state in stage 1 doesn't include changes
> made previously in stage 2.
>
> Is there any way to do this without throwing away the parallelism?
>
> Thanks in advance!
> ~Michael
>


Re: WordCount w/ YARN and EMR local filesystem and/or HDFS

2016-08-23 Thread Stephan Ewen
Hi!

The file "/home/hadoop/LICENSE.txt" probably exists only on the machine
that starts the job (your workstation, laptop), not in the cluster. The
Flink processes in the cluster cannot find the file under that address.

The input data must be in a filesystem that all cluster nodes can access,
like s3, hdfs, a mounted nfs, ...

Stephan


On Tue, Aug 23, 2016 at 7:54 PM, Foster, Craig  wrote:

> I'm trying to use the wordcount example with the local file system, but
> it's giving me permissions error or it's not finding it. It works just fine
> for input and output on S3. What is the correct URI usage for the local
> file system and HDFS?
>
>
>
> I have installed Flink on EMR and am just using the flink run script to
> start the job:
>
>
>
> % flink run -m yarn-cluster -yn 2 
> /usr/lib/flink/examples/streaming/WordCount.jar
> --input file:///home/hadoop/LICENSE.txt
>
>
>
> 
>
>
>
> The program finished with the following exception:
>
>
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Failed to submit job 8a9efe4f99c5cad5897c18146fb66309
> (Streaming WordCount)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>
> at org.apache.flink.streaming.api.environment.
> StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
>
> at org.apache.flink.streaming.examples.wordcount.WordCount.
> main(WordCount.java:94)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:505)
>
> at org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:403)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>
> at org.apache.flink.client.CliFrontend.executeProgramBlocking(
> CliFrontend.java:866)
>
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>
> at org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:1192)
>
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
> to submit job 8a9efe4f99c5cad5897c18146fb66309 (Streaming WordCount)
>
> at org.apache.flink.runtime.jobmanager.JobManager.org$
> apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:
> 1120)
>
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
>
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
>
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
>
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
>
> at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.
> applyOrElse(YarnJobManager.scala:153)
>
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>
> at org.apache.flink.runtime.LeaderSessionMessageFilter$$
> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> AbstractPartialFunction.scala:33)
>
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:33)
>
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> AbstractPartialFunction.scala:25)
>
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:33)
>
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:28)
>
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>
> at org.apache.flink.runtime.LogMessages$$anon$1.
> applyOrElse(LogMessages.scala:28)
>
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>
> at org.apache.flink.runtime.jobmanager.JobManager.
> aroundReceive(JobManager.scala:107)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
>
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
>
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> 

sharded state, 2-step operation

2016-08-23 Thread Michael Warnock
I'm trying to do something that seems like it should be possible, but my
implementation doesn't behave as expected, and I'm not sure how else to
express it.

Let's say the stream is composed of tuples like this: (Alice, Bob, 1) and I
want to keyBy(1), flatMap with state associated with Alice, then keyBy(2)
with state associated with Bob.  The trick is, when I later get a tuple
like (Bob, Alice, 1), I want the first operator to see the state that was
updated in the second op previously.  Is this possible?  I tried
 implementing both operators as one, getting the state by descriptor in the
flatMap body, and even instantiating the operator only once; the behavior
is, as you might guess, that the state in stage 1 doesn't include changes
made previously in stage 2.

Is there any way to do this without throwing away the parallelism?

Thanks in advance!
~Michael


WordCount w/ YARN and EMR local filesystem and/or HDFS

2016-08-23 Thread Foster, Craig
I'm trying to use the wordcount example with the local file system, but it's 
giving me permissions error or it's not finding it. It works just fine for 
input and output on S3. What is the correct URI usage for the local file system 
and HDFS?

I have installed Flink on EMR and am just using the flink run script to start 
the job:

% flink run -m yarn-cluster -yn 2 
/usr/lib/flink/examples/streaming/WordCount.jar --input 
file:///home/hadoop/LICENSE.txt



The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Failed to submit job 8a9efe4f99c5cad5897c18146fb66309 
(Streaming WordCount)
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
at 
org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:94)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at 
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to 
submit job 8a9efe4f99c5cad5897c18146fb66309 (Streaming WordCount)
at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1120)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:107)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits 
caused an error: File file:/home/hadoop/LICENSE.txt does not exist or the user 
running Flink ('yarn') has insufficient permissions to access it.
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:172)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:679)
at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1026)
... 25 more
Caused by: java.io.FileNotFoundException: File file:/home/hadoop/LICENSE.txt 
does not exist or the user 

JobManager HA without Distributed FileSystem

2016-08-23 Thread Konstantin Knauf
Hi all,

the documenation of JobManager HA [1] explains that HA is only possible
with the FS state backend as Job Manager metadata is saved there.

What are the particular problems using JobManager HA with the
MemoryStatebackend?

As I understand it, the state is checkpointed to all JobManagers
(leaders + standy) when using the MemoryStateBackend or am I wrong here?

Follow Up Question: Is it generally possible to setup a highly
available, at-least-once (source: Kafka) pipeline without a distributed
filesystem (only local FS and Zookeeper) for the checkpoints?

Cheers,

Konstantin


[1]
https://ci.apache.org/projects/flink/flink-docs-master/setup/jobmanager_high_availability.html

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082



signature.asc
Description: OpenPGP digital signature


Re: "Failed to retrieve JobManager address" in Flink 1.1.1 with JM HA

2016-08-23 Thread Maximilian Michels
Created an issue and fix should be there soon:
https://issues.apache.org/jira/browse/FLINK-4454

Thanks,
Max

On Tue, Aug 23, 2016 at 4:38 PM, Maximilian Michels  wrote:
> Hi!
>
> Yes, this is a bug. However, there seems to be something wrong with
> the config directory because Flink fails to load the default value
> ("localhost") from the config. If you had a default value for the job
> manager in flink-conf.yaml, it wouldn't fail but only display a wrong
> job manager url. Note that it still connects to the right job manager
> afterwards.
>
> Sorry for the trouble.
>
> Thanks,
> Max
>
> On Tue, Aug 23, 2016 at 11:02 AM, Ufuk Celebi  wrote:
>> You are right that this config key is not needed in this case.
>>
>> The ClusterClient has been refactored between Flink 1.0 and 1.1 and
>> the config parsing might be too strict in this case. It expects the
>> IPC address to be set, which is not necessary as you say. It should be
>> very easy to fix for 1.1.2. Let's confirm that it is actually a bug
>> with Max and file an issue afterwards.
>>
>> @Max: can you confirm whether this is correct?
>>
>>
>> On Tue, Aug 23, 2016 at 7:24 AM, Hironori Ogibayashi
>>  wrote:
>>> Hello,
>>>
>>> After I upgraded to 1.1.1, I am getting error when submitting job with
>>> "flink run"
>>>
>>> The command and result is like this. It has been working with Flink 1.0.3.
>>>
>>> ---
>>>  % FLINK_CONF_DIR=~/opt/flink/conf ~/opt/flink/flink-1.1.1/bin/flink
>>> run -c MyJob target/my-flink-job.jar
>>>
>>> 
>>>  The program finished with the following exception:
>>>
>>> java.lang.RuntimeException: Failed to retrieve JobManager address
>>> at 
>>> org.apache.flink.client.program.ClusterClient.getJobManagerAddressFromConfig(ClusterClient.java:244)
>>> at 
>>> org.apache.flink.client.program.StandaloneClusterClient.getClusterIdentifier(StandaloneClusterClient.java:78)
>>> at 
>>> org.apache.flink.client.CliFrontend.createClient(CliFrontend.java:887)
>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:237)
>>> at 
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
>>> Caused by: java.lang.IllegalArgumentException: hostname can't be null
>>> at java.net.InetSocketAddress.checkHost(InetSocketAddress.java:149)
>>> at java.net.InetSocketAddress.(InetSocketAddress.java:216)
>>> at 
>>> org.apache.flink.client.program.ClusterClient.getJobManagerAddressFromConfig(ClusterClient.java:242)
>>> ... 5 more
>>> ---
>>>
>>> I am using JobManager HA and I set "recovery.mode: zookeeper",
>>> recovery.zookeeper.quorum, recovery.zookeeper.path.root is my
>>> flink-conf.yaml.
>>> So, the client should be able to get JobManager address from zookeeper.
>>> If I explicitly specify JobManager address with -m option, it works.
>>>
>>> Am I missing something?
>>>
>>> Regards,
>>> Hironori Ogibayashi


Re: "Failed to retrieve JobManager address" in Flink 1.1.1 with JM HA

2016-08-23 Thread Maximilian Michels
Hi!

Yes, this is a bug. However, there seems to be something wrong with
the config directory because Flink fails to load the default value
("localhost") from the config. If you had a default value for the job
manager in flink-conf.yaml, it wouldn't fail but only display a wrong
job manager url. Note that it still connects to the right job manager
afterwards.

Sorry for the trouble.

Thanks,
Max

On Tue, Aug 23, 2016 at 11:02 AM, Ufuk Celebi  wrote:
> You are right that this config key is not needed in this case.
>
> The ClusterClient has been refactored between Flink 1.0 and 1.1 and
> the config parsing might be too strict in this case. It expects the
> IPC address to be set, which is not necessary as you say. It should be
> very easy to fix for 1.1.2. Let's confirm that it is actually a bug
> with Max and file an issue afterwards.
>
> @Max: can you confirm whether this is correct?
>
>
> On Tue, Aug 23, 2016 at 7:24 AM, Hironori Ogibayashi
>  wrote:
>> Hello,
>>
>> After I upgraded to 1.1.1, I am getting error when submitting job with
>> "flink run"
>>
>> The command and result is like this. It has been working with Flink 1.0.3.
>>
>> ---
>>  % FLINK_CONF_DIR=~/opt/flink/conf ~/opt/flink/flink-1.1.1/bin/flink
>> run -c MyJob target/my-flink-job.jar
>>
>> 
>>  The program finished with the following exception:
>>
>> java.lang.RuntimeException: Failed to retrieve JobManager address
>> at 
>> org.apache.flink.client.program.ClusterClient.getJobManagerAddressFromConfig(ClusterClient.java:244)
>> at 
>> org.apache.flink.client.program.StandaloneClusterClient.getClusterIdentifier(StandaloneClusterClient.java:78)
>> at 
>> org.apache.flink.client.CliFrontend.createClient(CliFrontend.java:887)
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:237)
>> at 
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
>> Caused by: java.lang.IllegalArgumentException: hostname can't be null
>> at java.net.InetSocketAddress.checkHost(InetSocketAddress.java:149)
>> at java.net.InetSocketAddress.(InetSocketAddress.java:216)
>> at 
>> org.apache.flink.client.program.ClusterClient.getJobManagerAddressFromConfig(ClusterClient.java:242)
>> ... 5 more
>> ---
>>
>> I am using JobManager HA and I set "recovery.mode: zookeeper",
>> recovery.zookeeper.quorum, recovery.zookeeper.path.root is my
>> flink-conf.yaml.
>> So, the client should be able to get JobManager address from zookeeper.
>> If I explicitly specify JobManager address with -m option, it works.
>>
>> Am I missing something?
>>
>> Regards,
>> Hironori Ogibayashi


Re: flink on yarn - Fatal error in AM: The ContainerLaunchContext was not set

2016-08-23 Thread Maximilian Michels
Hi Mira,

Does using the fully-qualified hostname solve the issue?

Thanks,
Max

On Mon, Aug 22, 2016 at 1:38 PM, Miroslav Gajdoš
 wrote:
> Here is the log from yarn application - run on another cluster (this
> time cdh5.7.0, but with similar configuration). Check the hostnames; in
> configuration, there are aliases used and the difference from fqdn may
> be the cause, judging by the log (exception at line 87)...
>
> http://pastebin.com/iimPVbXB
>
> Thanks,
> Mira
>
>
>
> Maximilian Michels píše v Pá 19. 08. 2016 v 09:12 +0200:
>> Hi Mira,
>>
>> If I understood correctly, the log output should be for Flink 1.1.1.
>> However, there are classes present in the log which don't exist in
>> Flink 1.1.1, e.g. FlinkYarnClient. Could you please check if you
>> posted the correct log?
>>
>> Also, it would be good to have not only the client log but also the
>> log of the Flink Yarn application.
>>
>> Thanks,
>> Max
>>
>> On Thu, Aug 18, 2016 at 3:20 PM, Miroslav Gajdoš
>>  wrote:
>> >
>> > Tried to build it from source as well as use prebuilt binary
>> > release
>> > (v1.1.1), the last one produced this log output:
>> > http://pastebin.com/3L5Yhs9x
>> >
>> > Application in yarn still fails on "Fatal error in AM: The
>> > ContainerLaunchContext was not set".
>> >
>> > Mira
>> >
>> > Miroslav Gajdoš píše v Čt 18. 08. 2016 v 10:36 +0200:
>> > >
>> > > Hi Max,
>> > >
>> > > we are building it from sources and package it for debian. I can
>> > > try
>> > > to
>> > > use the binary release for hadoop 2.6.0.
>> > >
>> > > Regarding zookeeper, we do not share instances between dev and
>> > > production.
>> > >
>> > > Thanks,
>> > > Miroslav
>> > >
>> > > Maximilian Michels píše v Čt 18. 08. 2016 v 10:17 +0200:
>> > > >
>> > > >
>> > > > Hi Miroslav,
>> > > >
>> > > > From the logs it looks like you're using Flink version 1.0.x.
>> > > > The
>> > > > ContainerLaunchContext is always set by Flink. I'm wondering
>> > > > why
>> > > > this
>> > > > error can still occur. Are you using the default Hadoop version
>> > > > that
>> > > > comes with Flink (2.3.0)? You could try the Hadoop 2.6.0 build
>> > > > of
>> > > > Flink.
>> > > >
>> > > > Does your Dev cluster share the Zookeeper installation with the
>> > > > production cluster? I'm wondering because it receives incorrect
>> > > > leadership information although the leading JobManager seems to
>> > > > be
>> > > > attempting to register at the ApplicationMaster.
>> > > >
>> > > > Best,
>> > > > Max
>> > > >
>> > > > On Tue, Aug 16, 2016 at 1:28 PM, Miroslav Gajdoš
>> > > >  wrote:
>> > > > >
>> > > > >
>> > > > >
>> > > > > Log from yarn session runner is here:
>> > > > > http://pastebin.com/xW1W4HNP
>> > > > >
>> > > > > Our hadoop distribution is from cloudera, resourcenanager
>> > > > > version:
>> > > > > 2.6.0-cdh5.4.5, it runs in HA mode (there could be some
>> > > > > redirecting
>> > > > > on
>> > > > > accessing resourcemanager and/or namenode to active one).
>> > > > >
>> > > > > Ufuk Celebi píše v Út 16. 08. 2016 v 12:18 +0200:
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > This could be a bug in Flink. Can you share the complete
>> > > > > > logs
>> > > > > > of
>> > > > > > the
>> > > > > > run? CC'ing Max who worked on the YARN client recently who
>> > > > > > might
>> > > > > > have
>> > > > > > an idea in which cases Flink would not set the context.
>> > > > > >
>> > > > > > On Tue, Aug 16, 2016 at 11:00 AM, Miroslav Gajdoš
>> > > > > >  wrote:
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > Hi guys,
>> > > > > > >
>> > > > > > > i've run into some problems with flink/yarn. I try to
>> > > > > > > deploy
>> > > > > > > flink
>> > > > > > > to
>> > > > > > > our cluster using /usr/lib/flink-scala2.10/bin/yarn-
>> > > > > > > session.sh,
>> > > > > > > but
>> > > > > > > the
>> > > > > > > yarn application does not even start, it goes from
>> > > > > > > accepted
>> > > > > > > to
>> > > > > > > finished/failed. Yarn info on resourcemanager looks like
>> > > > > > > this:
>> > > > > > >
>> > > > > > > User:   wa-flink
>> > > > > > > Name:   Flink session with 3 TaskManagers
>> > > > > > > Ap
>> > > > > > > plication Type: Apache Flink
>> > > > > > > Application Tags:
>> > > > > > > State:  FINISHED
>> > > > > > > FinalStatus:FAILED
>> > > > > > > Started:Mon Aug 15 18:02:42 +0200 2016
>> > > > > > > Elapsed:16sec
>> > > > > > > Tracking URL:   History
>> > > > > > > Diagnostics:Fatal error in AM: The
>> > > > > > > ContainerLaunchContext
>> > > > > > > was
>> > > > > > > not set.
>> > > > > > >
>> > > > > > > On dev cluster, applications deploys without problem,
>> > > > > > > this
>> > > > > > > happens
>> > > > > > > only
>> > > > > > > in production.
>> > > > > > >
>> > > > > > > What could be wrong?
>> > > > > > >
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > 

Re: Threading Model for Kinesis

2016-08-23 Thread Sameer W
Perfect - This explains it very clearly. Thank you very much!

Sameer

On Tue, Aug 23, 2016 at 9:31 AM, Tzu-Li (Gordon) Tai 
wrote:

> Slight misunderstanding here. The one thread per Kafka broker happens
> *after* the assignment of Kafka partitions to the source instances. So,
> with a total of 10 partitions and 10 source instances, each source instance
> will first be assigned 1 partition. Then, each source instance will create
> 1 thread for every individual broker that holds partitions that the source
> instance is assigned. The per-broker threading model of the Kafka consumer
> has nothing to do with the initial assignment of partitions to source
> instances.
>
> Another example to explain this more clearly:
> Say you have 2 Kafka brokers, and each hold 5 partitions, and have source
> parallelism 5. Each source instance will still have 2 partitions. If the
> 2 partitions belong to the same broker, the source instance will have only
> 1 consuming threads; otherwise if the 2 partitions belong to different
> brokers, the source instance will have 2 consuming threads.
>
> Regards,
> Gordon
>
>
> On August 23, 2016 at 8:47:15 PM, Sameer W (sam...@axiomine.com) wrote:
>
> Gordon,
>
> I tried the following with Kafka - 1 Broker but a topic has 10 partitions.
> I have a parallelism of 10 defined for the job. I see all my 10
> source->Mapper->assignTimestamps receiving and sending data. If there is
> only one source instance per broker how does that happen?
>
> Thanks,
> Sameer
>
> On Tue, Aug 23, 2016 at 7:17 AM, Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi!
>>
>> Kinesis shards should be ideally evenly assigned to the source instances.
>> So, with your example of source parallelism of 10 and 20 shards, each
>> source instance will have 2 shards and will have 2 threads consuming them
>> (therefore, not in round robin).
>>
>> For the Kafka consumer, in the source instances there will be one
>> consuming thread per broker, instead of partition. So, if a source instance
>> is assigned partitions that happen to be on the same broker, the source
>> instance will only create 1 thread to consume all of them.
>>
>> You are correct that currently the Kafka consumer does not handle
>> repartitioning transparently like the Kinesis connector, but we’re working
>> on this :)
>>
>> Regards,
>> Gordon
>>
>> On August 23, 2016 at 6:50:31 PM, Sameer W (sam...@axiomine.com) wrote:
>>
>> Hi,
>>
>> The documentation says that there will be one thread per shard. If I my
>> streaming job runs with a parallelism of 10 and there are 20 shards, are
>> more threads going to be launched within  a task slot running a source
>> function to consume the additional shards or will one source function
>> instance consume 2 shards in round robin.
>>
>> Is it any different for Kafka? Based on the documentation my
>> understanding is that if there are 10 source function instances and 20
>> partitions, each one will read 2 partitions.
>>
>> Also if partitions are added to Kafka are they handled by the existing
>> streaming job or does it need to be restarted? It appears as though Kinesis
>> handles it via the consumer constantly checking for more shards.
>>
>> Thanks,
>> Sameer
>>
>>
>


Re: Threading Model for Kinesis

2016-08-23 Thread Tzu-Li (Gordon) Tai
Hi Sameer,

I realized you might be a bit confused between “source instances (which in
general are Flink tasks)” and “threads” in my previous explanations. The
per-broker threads in the Kafka consumer and per-shard threads in the
Kinesis consumer I mentioned are threads created by the source instance’s
main thread. So, they have nothing to do with the assignment of
shard/partitions to the source instances. The threading models previously
explained refers to how a single source instance consumes multiple
shards/partitions that are assigned to it.

Hope this clarifies things for you more :)

Regards,
Gordon


On August 23, 2016 at 9:31:58 PM, Tzu-Li (Gordon) Tai (tzuli...@gmail.com)
wrote:

Slight misunderstanding here. The one thread per Kafka broker happens
*after* the assignment of Kafka partitions to the source instances. So,
with a total of 10 partitions and 10 source instances, each source instance
will first be assigned 1 partition. Then, each source instance will create
1 thread for every individual broker that holds partitions that the source
instance is assigned. The per-broker threading model of the Kafka consumer
has nothing to do with the initial assignment of partitions to source
instances.

Another example to explain this more clearly:
Say you have 2 Kafka brokers, and each hold 5 partitions, and have source
parallelism 5. Each source instance will still have 2 partitions. If the
2 partitions belong to the same broker, the source instance will have only
1 consuming threads; otherwise if the 2 partitions belong to different
brokers, the source instance will have 2 consuming threads.

Regards,
Gordon


On August 23, 2016 at 8:47:15 PM, Sameer W (sam...@axiomine.com) wrote:

Gordon,

I tried the following with Kafka - 1 Broker but a topic has 10 partitions.
I have a parallelism of 10 defined for the job. I see all my 10
source->Mapper->assignTimestamps receiving and sending data. If there is
only one source instance per broker how does that happen?

Thanks,
Sameer

On Tue, Aug 23, 2016 at 7:17 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi!
>
> Kinesis shards should be ideally evenly assigned to the source instances.
> So, with your example of source parallelism of 10 and 20 shards, each
> source instance will have 2 shards and will have 2 threads consuming them
> (therefore, not in round robin).
>
> For the Kafka consumer, in the source instances there will be one
> consuming thread per broker, instead of partition. So, if a source instance
> is assigned partitions that happen to be on the same broker, the source
> instance will only create 1 thread to consume all of them.
>
> You are correct that currently the Kafka consumer does not handle
> repartitioning transparently like the Kinesis connector, but we’re working
> on this :)
>
> Regards,
> Gordon
>
> On August 23, 2016 at 6:50:31 PM, Sameer W (sam...@axiomine.com) wrote:
>
> Hi,
>
> The documentation says that there will be one thread per shard. If I my
> streaming job runs with a parallelism of 10 and there are 20 shards, are
> more threads going to be launched within  a task slot running a source
> function to consume the additional shards or will one source function
> instance consume 2 shards in round robin.
>
> Is it any different for Kafka? Based on the documentation my understanding
> is that if there are 10 source function instances and 20 partitions, each
> one will read 2 partitions.
>
> Also if partitions are added to Kafka are they handled by the existing
> streaming job or does it need to be restarted? It appears as though Kinesis
> handles it via the consumer constantly checking for more shards.
>
> Thanks,
> Sameer
>
>


Re: How to share text file across tasks at run time in flink.

2016-08-23 Thread Lohith Samaga M
Hi
May be you could use Cassandra to store and fetch all such reference data.  
This way the reference data can be updated without restarting your application.

Lohith

Sent from my Sony Xperia™ smartphone


 Baswaraj Kasture wrote 

Thanks Kostas !
I am using DataStream API.

I have few config/property files (key vale text file) and also have business 
rule files (json).
These rules and configurations are needed when we process incoming event.
Is there any way to share them to task nodes from driver program ?
I think this is very common use case and am sure other users may face similar 
issues.

+Baswaraj

On Mon, Aug 22, 2016 at 4:56 PM, Kostas Kloudas 
> wrote:
Hello Baswaraj,

Are you using the DataSet (batch) or the DataStream API?

If you are in the first, you can use a broadcast 
variable
 for your task.
If you are using the DataStream one, then there is no proper support for that.

Thanks,
Kostas

On Aug 20, 2016, at 12:33 PM, Baswaraj Kasture 
> wrote:

Am running Flink standalone cluster.

I have text file that need to be shared across tasks when i submit my 
application.
in other words , put this text file in class path of running tasks.

How can we achieve this with flink ?

In spark, spark-submit has --jars option that puts all the files specified in 
class path of executors (executors run in separate JVM and spawned dynamically, 
so it is possible).

Flink's task managers run tasks in separate thread under taskmanager JVM (?) , 
how can we make this text file to be accessible on all tasks spawned by current 
application ?

Using HDFS, NFS or including file in program jar is one way that i know, but am 
looking for solution that can allows me to provide text file at run time and 
still accessible in all tasks.
Thanks.


Information transmitted by this e-mail is proprietary to Mphasis, its 
associated companies and/ or its customers and is intended 
for use only by the individual or entity to which it is addressed, and may 
contain information that is privileged, confidential or 
exempt from disclosure under applicable law. If you are not the intended 
recipient or it appears that this mail has been forwarded 
to you without proper authority, you are notified that any use or dissemination 
of this information in any manner is strictly 
prohibited. In such cases, please notify us immediately at 
mailmas...@mphasis.com and delete this mail from your records.


Re: Threading Model for Kinesis

2016-08-23 Thread Tzu-Li (Gordon) Tai
Slight misunderstanding here. The one thread per Kafka broker happens
*after* the assignment of Kafka partitions to the source instances. So,
with a total of 10 partitions and 10 source instances, each source instance
will first be assigned 1 partition. Then, each source instance will create
1 thread for every individual broker that holds partitions that the source
instance is assigned. The per-broker threading model of the Kafka consumer
has nothing to do with the initial assignment of partitions to source
instances.

Another example to explain this more clearly:
Say you have 2 Kafka brokers, and each hold 5 partitions, and have source
parallelism 5. Each source instance will still have 2 partitions. If the
2 partitions belong to the same broker, the source instance will have only
1 consuming threads; otherwise if the 2 partitions belong to different
brokers, the source instance will have 2 consuming threads.

Regards,
Gordon


On August 23, 2016 at 8:47:15 PM, Sameer W (sam...@axiomine.com) wrote:

Gordon,

I tried the following with Kafka - 1 Broker but a topic has 10 partitions.
I have a parallelism of 10 defined for the job. I see all my 10
source->Mapper->assignTimestamps receiving and sending data. If there is
only one source instance per broker how does that happen?

Thanks,
Sameer

On Tue, Aug 23, 2016 at 7:17 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi!
>
> Kinesis shards should be ideally evenly assigned to the source instances.
> So, with your example of source parallelism of 10 and 20 shards, each
> source instance will have 2 shards and will have 2 threads consuming them
> (therefore, not in round robin).
>
> For the Kafka consumer, in the source instances there will be one
> consuming thread per broker, instead of partition. So, if a source instance
> is assigned partitions that happen to be on the same broker, the source
> instance will only create 1 thread to consume all of them.
>
> You are correct that currently the Kafka consumer does not handle
> repartitioning transparently like the Kinesis connector, but we’re working
> on this :)
>
> Regards,
> Gordon
>
> On August 23, 2016 at 6:50:31 PM, Sameer W (sam...@axiomine.com) wrote:
>
> Hi,
>
> The documentation says that there will be one thread per shard. If I my
> streaming job runs with a parallelism of 10 and there are 20 shards, are
> more threads going to be launched within  a task slot running a source
> function to consume the additional shards or will one source function
> instance consume 2 shards in round robin.
>
> Is it any different for Kafka? Based on the documentation my understanding
> is that if there are 10 source function instances and 20 partitions, each
> one will read 2 partitions.
>
> Also if partitions are added to Kafka are they handled by the existing
> streaming job or does it need to be restarted? It appears as though Kinesis
> handles it via the consumer constantly checking for more shards.
>
> Thanks,
> Sameer
>
>


Re: Threading Model for Kinesis

2016-08-23 Thread Sameer W
Gordon,

I tried the following with Kafka - 1 Broker but a topic has 10 partitions.
I have a parallelism of 10 defined for the job. I see all my 10
source->Mapper->assignTimestamps receiving and sending data. If there is
only one source instance per broker how does that happen?

Thanks,
Sameer

On Tue, Aug 23, 2016 at 7:17 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi!
>
> Kinesis shards should be ideally evenly assigned to the source instances.
> So, with your example of source parallelism of 10 and 20 shards, each
> source instance will have 2 shards and will have 2 threads consuming them
> (therefore, not in round robin).
>
> For the Kafka consumer, in the source instances there will be one
> consuming thread per broker, instead of partition. So, if a source instance
> is assigned partitions that happen to be on the same broker, the source
> instance will only create 1 thread to consume all of them.
>
> You are correct that currently the Kafka consumer does not handle
> repartitioning transparently like the Kinesis connector, but we’re working
> on this :)
>
> Regards,
> Gordon
>
> On August 23, 2016 at 6:50:31 PM, Sameer W (sam...@axiomine.com) wrote:
>
> Hi,
>
> The documentation says that there will be one thread per shard. If I my
> streaming job runs with a parallelism of 10 and there are 20 shards, are
> more threads going to be launched within  a task slot running a source
> function to consume the additional shards or will one source function
> instance consume 2 shards in round robin.
>
> Is it any different for Kafka? Based on the documentation my understanding
> is that if there are 10 source function instances and 20 partitions, each
> one will read 2 partitions.
>
> Also if partitions are added to Kafka are they handled by the existing
> streaming job or does it need to be restarted? It appears as though Kinesis
> handles it via the consumer constantly checking for more shards.
>
> Thanks,
> Sameer
>
>


Re: How to share text file across tasks at run time in flink.

2016-08-23 Thread Baswaraj Kasture
Thanks Kostas !
I am using DataStream API.

I have few config/property files (key vale text file) and also have
business rule files (json).
These rules and configurations are needed when we process incoming event.
Is there any way to share them to task nodes from driver program ?
I think this is very common use case and am sure other users may face
similar issues.

+Baswaraj

On Mon, Aug 22, 2016 at 4:56 PM, Kostas Kloudas  wrote:

> Hello Baswaraj,
>
> Are you using the DataSet (batch) or the DataStream API?
>
> If you are in the first, you can use a broadcast variable
> 
>  for
> your task.
> If you are using the DataStream one, then there is no proper support for
> that.
>
> Thanks,
> Kostas
>
> On Aug 20, 2016, at 12:33 PM, Baswaraj Kasture 
> wrote:
>
> Am running Flink standalone cluster.
>
> I have text file that need to be shared across tasks when i submit my
> application.
> in other words , put this text file in class path of running tasks.
>
> How can we achieve this with flink ?
>
> In spark, spark-submit has --jars option that puts all the files specified
> in class path of executors (executors run in separate JVM and spawned
> dynamically, so it is possible).
>
> Flink's task managers run tasks in separate thread under taskmanager JVM
> (?) , how can we make this text file to be accessible on all tasks spawned
> by current application ?
>
> Using HDFS, NFS or including file in program jar is one way that i know,
> but am looking for solution that can allows me to provide text file at run
> time and still accessible in all tasks.
> Thanks.
>
>
>


Re: Default timestamps for Event Time when no Watermark Assigner used?

2016-08-23 Thread Tzu-Li (Gordon) Tai
No, it does not default to Ingestion Time. For other connectors in general,
you have to explicitly call `assignTimestampAndWatermarks()` before the
first operator in the topology that works on time (ex. windows), otherwise
the job will fail as soon as records start incoming.

Currently, I think only the Kinesis connector and, shortly in the future,
Kafka 0.10 connector will have default timestamps when the topology uses
Event Time. Otherwise, the behaviour is described as above.

Regards,
Gordon


On August 23, 2016 at 7:34:25 PM, Sameer W (sam...@axiomine.com) wrote:

Thanks - Is there also a default behavior for non Kinesis streams? If I set
the time characteristics as Event Time but do not assign timestamps or
generate watermarks by invoking the assignTimestampsAndWatermarks
function, does
that default to using Ingestion time. Or in other words is it like I
invoking this method on the source stream-

assignTimestampsAndWatermarks(new IngestionTimeExtractor<>())

Sameer

On Tue, Aug 23, 2016 at 7:29 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> For the Kinesis consumer, when you use Event Time but do not explicitly
> assign timestamps, the Kinesis server-side timestamp (the time which
> Kinesis received the record) is attached to the record as default, not
> Flink’s ingestion time.
>
> Does this answer your question?
>
> Regards,
> Gordon
>
>
> On August 23, 2016 at 6:42:26 PM, Sameer W (sam...@axiomine.com) wrote:
>
> Hi,
>
> If you do not explicitly assign timestamps and watermarks when using Event
> Time, does it automatically default to using Ingestion Time?
>
> I was reading the Kinesis integration section and came across the note
> below and which raised the above question. I saw another place where you
> explicitly use Event Time with ingestion time with the following - .
> assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());.
>
> Does that line have to called explicitly or is it the default?
>
>
> "If streaming topologies choose to use the event time notion
> 
>  for
> record timestamps, an *approximate arrival timestamp* will be used by
> default. This timestamp is attached to records by Kinesis once they were
> successfully received and stored by streams. Note that this timestamp is
> typically referred to as a Kinesis server-side timestamp, and there are no
> guarantees about the accuracy or order correctness (i.e., the timestamps
> may not always be ascending)."
>
> Thanks,
> Sameer
>
>


Setting up zeppelin with flink

2016-08-23 Thread Frank Dekervel
Hello,

I try to set up apache zeppelin with a flink cluster (one jobmanager, one
task manager).

What i did was using the dockerfiles in flink-contrib/docker-flink + the
latest binary release of apache zeppelin with all interpreters:

https://github.com/apache/flink/blob/master/flink-contrib/docker-flink/Dockerfile
(i changed the flink version to 1.0.3 to match zeppelin's flink version)

I built another docker image around the latest binary release of zeppelin
(with all interpreters), and i reconfigure the flink interpreter:

   - connect to existing process
   - host: jobmanager, port: 6123
   - i removed all other properties

when i try to submit a flink job, i get an error state and the following
exception appears in the log (nothing appears in the jobmanager log)

ERROR [2016-08-23 11:44:57,932] ({Thread-16}
JobProgressPoller.java[run]:54) - Can not get or update progress
org.apache.zeppelin.interpreter.InterpreterException:
org.apache.thrift.transport.TTransportException
at
org.apache.zeppelin.interpreter.remote.RemoteInterpreter.getProgress(RemoteInterpreter.java:373)
at
org.apache.zeppelin.interpreter.LazyOpenInterpreter.getProgress(LazyOpenInterpreter.java:111)
at
org.apache.zeppelin.notebook.Paragraph.progress(Paragraph.java:237)
at
org.apache.zeppelin.scheduler.JobProgressPoller.run(JobProgressPoller.java:51)
Caused by: org.apache.thrift.transport.TTransportException
at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at
org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at
org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429)
at
org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318)
at
org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219)
at
org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
at
org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.recv_getProgress(RemoteInterpreterService.java:296)
at
org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.getProgress(RemoteInterpreterService.java:281)
at
org.apache.zeppelin.interpreter.remote.RemoteInterpreter.getProgress(RemoteInterpreter.java:370)
... 3 more

Flink in local mode works fine on zeppelin.
Could somebody point me to what i'm doing wrong ?

Thanks a lot!
Frank


Re: Default timestamps for Event Time when no Watermark Assigner used?

2016-08-23 Thread Sameer W
Thanks - Is there also a default behavior for non Kinesis streams? If I set
the time characteristics as Event Time but do not assign timestamps or
generate watermarks by invoking the assignTimestampsAndWatermarks
function, does
that default to using Ingestion time. Or in other words is it like I
invoking this method on the source stream-

assignTimestampsAndWatermarks(new IngestionTimeExtractor<>())

Sameer

On Tue, Aug 23, 2016 at 7:29 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> For the Kinesis consumer, when you use Event Time but do not explicitly
> assign timestamps, the Kinesis server-side timestamp (the time which
> Kinesis received the record) is attached to the record as default, not
> Flink’s ingestion time.
>
> Does this answer your question?
>
> Regards,
> Gordon
>
>
> On August 23, 2016 at 6:42:26 PM, Sameer W (sam...@axiomine.com) wrote:
>
> Hi,
>
> If you do not explicitly assign timestamps and watermarks when using Event
> Time, does it automatically default to using Ingestion Time?
>
> I was reading the Kinesis integration section and came across the note
> below and which raised the above question. I saw another place where you
> explicitly use Event Time with ingestion time with the following - .
> assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());.
>
> Does that line have to called explicitly or is it the default?
>
>
> "If streaming topologies choose to use the event time notion
> 
>  for
> record timestamps, an *approximate arrival timestamp* will be used by
> default. This timestamp is attached to records by Kinesis once they were
> successfully received and stored by streams. Note that this timestamp is
> typically referred to as a Kinesis server-side timestamp, and there are no
> guarantees about the accuracy or order correctness (i.e., the timestamps
> may not always be ascending)."
>
> Thanks,
> Sameer
>
>


Re: Default timestamps for Event Time when no Watermark Assigner used?

2016-08-23 Thread Tzu-Li (Gordon) Tai
Hi,

For the Kinesis consumer, when you use Event Time but do not explicitly
assign timestamps, the Kinesis server-side timestamp (the time which
Kinesis received the record) is attached to the record as default, not
Flink’s ingestion time.

Does this answer your question?

Regards,
Gordon


On August 23, 2016 at 6:42:26 PM, Sameer W (sam...@axiomine.com) wrote:

Hi,

If you do not explicitly assign timestamps and watermarks when using Event
Time, does it automatically default to using Ingestion Time?

I was reading the Kinesis integration section and came across the note
below and which raised the above question. I saw another place where you
explicitly use Event Time with ingestion time with the following
- .assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());.

Does that line have to called explicitly or is it the default?


"If streaming topologies choose to use the event time notion

for
record timestamps, an *approximate arrival timestamp* will be used by
default. This timestamp is attached to records by Kinesis once they were
successfully received and stored by streams. Note that this timestamp is
typically referred to as a Kinesis server-side timestamp, and there are no
guarantees about the accuracy or order correctness (i.e., the timestamps
may not always be ascending)."

Thanks,
Sameer


Re: Threading Model for Kinesis

2016-08-23 Thread Sameer W
Thanks Gordon - Appreciate the fast response.

Sameer

On Tue, Aug 23, 2016 at 7:17 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi!
>
> Kinesis shards should be ideally evenly assigned to the source instances.
> So, with your example of source parallelism of 10 and 20 shards, each
> source instance will have 2 shards and will have 2 threads consuming them
> (therefore, not in round robin).
>
> For the Kafka consumer, in the source instances there will be one
> consuming thread per broker, instead of partition. So, if a source instance
> is assigned partitions that happen to be on the same broker, the source
> instance will only create 1 thread to consume all of them.
>
> You are correct that currently the Kafka consumer does not handle
> repartitioning transparently like the Kinesis connector, but we’re working
> on this :)
>
> Regards,
> Gordon
>
> On August 23, 2016 at 6:50:31 PM, Sameer W (sam...@axiomine.com) wrote:
>
> Hi,
>
> The documentation says that there will be one thread per shard. If I my
> streaming job runs with a parallelism of 10 and there are 20 shards, are
> more threads going to be launched within  a task slot running a source
> function to consume the additional shards or will one source function
> instance consume 2 shards in round robin.
>
> Is it any different for Kafka? Based on the documentation my understanding
> is that if there are 10 source function instances and 20 partitions, each
> one will read 2 partitions.
>
> Also if partitions are added to Kafka are they handled by the existing
> streaming job or does it need to be restarted? It appears as though Kinesis
> handles it via the consumer constantly checking for more shards.
>
> Thanks,
> Sameer
>
>


Re: FLINK-4329 fix version

2016-08-23 Thread Ufuk Celebi
On Tue, Aug 23, 2016 at 12:28 PM, Yassine Marzougui
 wrote:
> The fix version of FLINK-4329 in JIRA is set to 1.1.1, but 1.1.1 is already
> released. Should I expect it to be fixed in the next release? and will a
> patch be available meanwhile? Thanks.

Hey Yassine! The JIRA fix version tag is incorrect as you say (I just
removed it). The best thing would be to ask in the pull request
directly: https://github.com/apache/flink/pull/2350. I would assume
that it will be included in 1.1.2 too.

– Ufuk


Re: Threading Model for Kinesis

2016-08-23 Thread Tzu-Li (Gordon) Tai
Hi!

Kinesis shards should be ideally evenly assigned to the source instances.
So, with your example of source parallelism of 10 and 20 shards, each
source instance will have 2 shards and will have 2 threads consuming them
(therefore, not in round robin).

For the Kafka consumer, in the source instances there will be one consuming
thread per broker, instead of partition. So, if a source instance is
assigned partitions that happen to be on the same broker, the source
instance will only create 1 thread to consume all of them.

You are correct that currently the Kafka consumer does not handle
repartitioning transparently like the Kinesis connector, but we’re working
on this :)

Regards,
Gordon

On August 23, 2016 at 6:50:31 PM, Sameer W (sam...@axiomine.com) wrote:

Hi,

The documentation says that there will be one thread per shard. If I my
streaming job runs with a parallelism of 10 and there are 20 shards, are
more threads going to be launched within  a task slot running a source
function to consume the additional shards or will one source function
instance consume 2 shards in round robin.

Is it any different for Kafka? Based on the documentation my understanding
is that if there are 10 source function instances and 20 partitions, each
one will read 2 partitions.

Also if partitions are added to Kafka are they handled by the existing
streaming job or does it need to be restarted? It appears as though Kinesis
handles it via the consumer constantly checking for more shards.

Thanks,
Sameer


Threading Model for Kinesis

2016-08-23 Thread Sameer W
Hi,

The documentation says that there will be one thread per shard. If I my
streaming job runs with a parallelism of 10 and there are 20 shards, are
more threads going to be launched within  a task slot running a source
function to consume the additional shards or will one source function
instance consume 2 shards in round robin.

Is it any different for Kafka? Based on the documentation my understanding
is that if there are 10 source function instances and 20 partitions, each
one will read 2 partitions.

Also if partitions are added to Kafka are they handled by the existing
streaming job or does it need to be restarted? It appears as though Kinesis
handles it via the consumer constantly checking for more shards.

Thanks,
Sameer


Default timestamps for Event Time when no Watermark Assigner used?

2016-08-23 Thread Sameer W
Hi,

If you do not explicitly assign timestamps and watermarks when using Event
Time, does it automatically default to using Ingestion Time?

I was reading the Kinesis integration section and came across the note
below and which raised the above question. I saw another place where you
explicitly use Event Time with ingestion time with the following
- .assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());.

Does that line have to called explicitly or is it the default?


"If streaming topologies choose to use the event time notion

for
record timestamps, an *approximate arrival timestamp* will be used by
default. This timestamp is attached to records by Kinesis once they were
successfully received and stored by streams. Note that this timestamp is
typically referred to as a Kinesis server-side timestamp, and there are no
guarantees about the accuracy or order correctness (i.e., the timestamps
may not always be ascending)."

Thanks,
Sameer


FLINK-4329 fix version

2016-08-23 Thread Yassine Marzougui
Hi all,

The fix version of FLINK-4329
 in JIRA is set to 1.1.1,
but 1.1.1 is already released. Should I expect it to be fixed in the next
release? and will a patch be available meanwhile? Thanks.

Yassine


Re: Batch jobs with a very large number of input splits

2016-08-23 Thread Niels Basjes
I did more digging and finally understand what goes wrong.
I create a yarn-session with 50 slots.
Then I run my job that (due to the fact that my HBase table has 100s of
regions) has a lot of inputsplits.
The job then runs with parallelism 50 because I did not specify the value.
As a consequence the second job I start in the same yarn-session is faced
with 0 available task slots and fails with this exception:

08/23/2016 09:58:52 Job execution switched to status FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Not enough free slots available to run the job. You can decrease the
operator parallelism or increase the number of slots per TaskManager in the
configuration. Task to schedule: .. Resources available to scheduler:
Number of instances=5, total number of slots=50, available slots=0

So my conclusion for now is that if you want to run batch jobs in
yarn-session then you MUST specify the parallelism for all steps or
otherwise it will fill the yarn-session completely and you cannot run
multiple jobs in parallel.

Is this conclusion correct?

Niels Basjes


On Fri, Aug 19, 2016 at 3:18 PM, Robert Metzger  wrote:

> Hi Niels,
>
> In Flink, you don't need one task per file, since splits are assigned
> lazily to reading tasks.
> What exactly is the error you are getting when trying to read that many
> input splits? (Is it on the JobManager?)
>
> Regards,
> Robert
>
> On Thu, Aug 18, 2016 at 1:56 PM, Niels Basjes  wrote:
>
>> Hi,
>>
>> I'm working on a batch process using Flink and I ran into an interesting
>> problem.
>> The number of input splits in my job is really really large.
>>
>> I currently have a HBase input (with more than 1000 regions) and in the
>> past I have worked with MapReduce jobs doing 2000+ files.
>>
>> The problem I have is that if I run such a job in a "small" yarn-session
>> (i.e. less than 1000 tasks) I get a fatal error indicating that there are
>> not enough resources.
>> For a continuous streaming job this makes sense, yet for a batch job
>> (like I'm having) this is an undesirable error.
>>
>> For my HBase situation I currently have a workaround by overriding the
>> creatInputSplits method from the TableInputFormat and thus control the
>> input splits that are created.
>>
>> What is the correct way to solve this (no my cluster is NOT big enough to
>> run that many parallel tasks) ?
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes