Re: flink-shaded-hadoop

2016-08-23 Thread Aljoscha Krettek
Hi, this might be due to a bug in the Flink 1.1.0 maven dependencies. Can you try updating to Flink 1.1.1? Cheers, Aljoscha On Mon, 22 Aug 2016 at 07:48 wrote: > Hi, > every one , when i use scala version 2.10,and set the sbt project(add > those:flink-core,flink-scala,flink-streaming-scala,

Re: How to share text file across tasks at run time in flink.

2016-08-23 Thread Jark Wu
Hi, I think what Bswaraj want is excatly something like Storm Distributed Cache API[1] (if I’m not misunderstanding). > The distributed cache feature in storm is used to efficiently distribute > files (or blobs, which is the equivalent terminology for a file in the > distributed cache and is

Dealing with Multiple sinks in Flink

2016-08-23 Thread Vinay Patil
Hi, In our flink pipeline we are currently writing the data to multiple S3 objects/folders based on some conditions, so the issue I am facing is as follows : Consider these S3 folders : temp_bucket/processedData/20160823/ temp_bucket/rawData/20160822/ temp_bucket/errorData/20160821/ Now when

How to get latency info from benchmark

2016-08-23 Thread Eric Fukuda
Hi, I'm trying to benchmark Flink without Kafka as mentioned in this post ( http://data-artisans.com/extending-the-yahoo-streaming-benchmark/). After running flink.benchmark.state.AdvertisingTopologyFlinkState with user.local.event.generator in localConf.yaml set to 1, I ran flink.benchmark.utils.

Re: WordCount w/ YARN and EMR local filesystem and/or HDFS

2016-08-23 Thread Robert Metzger
Hi, the problem is that you are using the wrong Namenode port. The port is 8020, not 50070. On EMR, you actually don't need to specify the Namenode port at all. This command works for me: [hadoop@ip-172-31-23-104 ~]$ flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar

Re: sharded state, 2-step operation

2016-08-23 Thread Michael Warnock
Another approach I'm considering, which feels pretty kludgy, but I think should be acceptable for my current use: Only one stateful op, keyed on the same field, but with a flag field indicating the actual operation to be performed. The results of this op are output to a kafka (or whatever) queue,

Re: sharded state, 2-step operation

2016-08-23 Thread Michael Warnock
Thanks for the quick response! I've been wondering about Connected streams and CoFlatMap, but either I don't see all the ways they can be used, or they don't solve my problem. Do you know of any examples outside of the documentation? My searches for "flink comap example" and similar haven't turne

Re: WordCount w/ YARN and EMR local filesystem and/or HDFS

2016-08-23 Thread Stephan Ewen
I would have to pull in Robert into the loop, but my first guess is that this is a Hadoop version mismatch. Can you double check that the Hadoop Version for which you download Flink is the same as the one on the cluster? Greetings, Stephan On Tue, Aug 23, 2016 at 8:44 PM, Foster, Craig wrote:

Re: JobManager HA without Distributed FileSystem

2016-08-23 Thread Stephan Ewen
Hi! The state one can store in ZooKeeper is only very small (recommended is smaller than 1MB per handle). For HA, the JobManager needs to persist: - JobGraph - JAR files - Checkpoint Metadata Those are easily too large for ZooKeeper, which is why Flink currently requires a DFS to store tho

Re: sharded state, 2-step operation

2016-08-23 Thread Stephan Ewen
Hi! This is a tricky one. State access and changes are not shared across operators in Flink. We chose that design because it makes it possible to work on "local" state in each operator - state automatically shards with the computation - no locking / concurrency implications - asynchronous pe

Re: WordCount w/ YARN and EMR local filesystem and/or HDFS

2016-08-23 Thread Stephan Ewen
Hi! The file "/home/hadoop/LICENSE.txt" probably exists only on the machine that starts the job (your workstation, laptop), not in the cluster. The Flink processes in the cluster cannot find the file under that address. The input data must be in a filesystem that all cluster nodes can access, lik

sharded state, 2-step operation

2016-08-23 Thread Michael Warnock
I'm trying to do something that seems like it should be possible, but my implementation doesn't behave as expected, and I'm not sure how else to express it. Let's say the stream is composed of tuples like this: (Alice, Bob, 1) and I want to keyBy(1), flatMap with state associated with Alice, then

WordCount w/ YARN and EMR local filesystem and/or HDFS

2016-08-23 Thread Foster, Craig
I'm trying to use the wordcount example with the local file system, but it's giving me permissions error or it's not finding it. It works just fine for input and output on S3. What is the correct URI usage for the local file system and HDFS? I have installed Flink on EMR and am just using the f

JobManager HA without Distributed FileSystem

2016-08-23 Thread Konstantin Knauf
Hi all, the documenation of JobManager HA [1] explains that HA is only possible with the FS state backend as Job Manager metadata is saved there. What are the particular problems using JobManager HA with the MemoryStatebackend? As I understand it, the state is checkpointed to all JobManagers (le

Re: "Failed to retrieve JobManager address" in Flink 1.1.1 with JM HA

2016-08-23 Thread Maximilian Michels
Created an issue and fix should be there soon: https://issues.apache.org/jira/browse/FLINK-4454 Thanks, Max On Tue, Aug 23, 2016 at 4:38 PM, Maximilian Michels wrote: > Hi! > > Yes, this is a bug. However, there seems to be something wrong with > the config directory because Flink fails to load

Re: "Failed to retrieve JobManager address" in Flink 1.1.1 with JM HA

2016-08-23 Thread Maximilian Michels
Hi! Yes, this is a bug. However, there seems to be something wrong with the config directory because Flink fails to load the default value ("localhost") from the config. If you had a default value for the job manager in flink-conf.yaml, it wouldn't fail but only display a wrong job manager url. No

Re: flink on yarn - Fatal error in AM: The ContainerLaunchContext was not set

2016-08-23 Thread Maximilian Michels
Hi Mira, Does using the fully-qualified hostname solve the issue? Thanks, Max On Mon, Aug 22, 2016 at 1:38 PM, Miroslav Gajdoš wrote: > Here is the log from yarn application - run on another cluster (this > time cdh5.7.0, but with similar configuration). Check the hostnames; in > configuration,

Re: Threading Model for Kinesis

2016-08-23 Thread Sameer W
Perfect - This explains it very clearly. Thank you very much! Sameer On Tue, Aug 23, 2016 at 9:31 AM, Tzu-Li (Gordon) Tai wrote: > Slight misunderstanding here. The one thread per Kafka broker happens > *after* the assignment of Kafka partitions to the source instances. So, > with a total of 10

Re: Threading Model for Kinesis

2016-08-23 Thread Tzu-Li (Gordon) Tai
Hi Sameer, I realized you might be a bit confused between “source instances (which in general are Flink tasks)” and “threads” in my previous explanations. The per-broker threads in the Kafka consumer and per-shard threads in the Kinesis consumer I mentioned are threads created by the source instan

Re: How to share text file across tasks at run time in flink.

2016-08-23 Thread Lohith Samaga M
Hi May be you could use Cassandra to store and fetch all such reference data. This way the reference data can be updated without restarting your application. Lohith Sent from my Sony Xperia™ smartphone Baswaraj Kasture wrote Thanks Kostas ! I am using DataStream API. I have few co

Re: Threading Model for Kinesis

2016-08-23 Thread Tzu-Li (Gordon) Tai
Slight misunderstanding here. The one thread per Kafka broker happens *after* the assignment of Kafka partitions to the source instances. So, with a total of 10 partitions and 10 source instances, each source instance will first be assigned 1 partition. Then, each source instance will create 1 thre

Re: Threading Model for Kinesis

2016-08-23 Thread Sameer W
Gordon, I tried the following with Kafka - 1 Broker but a topic has 10 partitions. I have a parallelism of 10 defined for the job. I see all my 10 source->Mapper->assignTimestamps receiving and sending data. If there is only one source instance per broker how does that happen? Thanks, Sameer On

Re: How to share text file across tasks at run time in flink.

2016-08-23 Thread Baswaraj Kasture
Thanks Kostas ! I am using DataStream API. I have few config/property files (key vale text file) and also have business rule files (json). These rules and configurations are needed when we process incoming event. Is there any way to share them to task nodes from driver program ? I think this is ve

Re: Default timestamps for Event Time when no Watermark Assigner used?

2016-08-23 Thread Tzu-Li (Gordon) Tai
No, it does not default to Ingestion Time. For other connectors in general, you have to explicitly call `assignTimestampAndWatermarks()` before the first operator in the topology that works on time (ex. windows), otherwise the job will fail as soon as records start incoming. Currently, I think onl

Setting up zeppelin with flink

2016-08-23 Thread Frank Dekervel
Hello, I try to set up apache zeppelin with a flink cluster (one jobmanager, one task manager). What i did was using the dockerfiles in flink-contrib/docker-flink + the latest binary release of apache zeppelin with all interpreters: https://github.com/apache/flink/blob/master/flink-contrib/docke

Re: Default timestamps for Event Time when no Watermark Assigner used?

2016-08-23 Thread Sameer W
Thanks - Is there also a default behavior for non Kinesis streams? If I set the time characteristics as Event Time but do not assign timestamps or generate watermarks by invoking the assignTimestampsAndWatermarks function, does that default to using Ingestion time. Or in other words is it like I in

Re: Default timestamps for Event Time when no Watermark Assigner used?

2016-08-23 Thread Tzu-Li (Gordon) Tai
Hi, For the Kinesis consumer, when you use Event Time but do not explicitly assign timestamps, the Kinesis server-side timestamp (the time which Kinesis received the record) is attached to the record as default, not Flink’s ingestion time. Does this answer your question? Regards, Gordon On Aug

Re: Threading Model for Kinesis

2016-08-23 Thread Sameer W
Thanks Gordon - Appreciate the fast response. Sameer On Tue, Aug 23, 2016 at 7:17 AM, Tzu-Li (Gordon) Tai wrote: > Hi! > > Kinesis shards should be ideally evenly assigned to the source instances. > So, with your example of source parallelism of 10 and 20 shards, each > source instance will hav

Re: FLINK-4329 fix version

2016-08-23 Thread Ufuk Celebi
On Tue, Aug 23, 2016 at 12:28 PM, Yassine Marzougui wrote: > The fix version of FLINK-4329 in JIRA is set to 1.1.1, but 1.1.1 is already > released. Should I expect it to be fixed in the next release? and will a > patch be available meanwhile? Thanks. Hey Yassine! The JIRA fix version tag is inco

Re: Threading Model for Kinesis

2016-08-23 Thread Tzu-Li (Gordon) Tai
Hi! Kinesis shards should be ideally evenly assigned to the source instances. So, with your example of source parallelism of 10 and 20 shards, each source instance will have 2 shards and will have 2 threads consuming them (therefore, not in round robin). For the Kafka consumer, in the source inst

Threading Model for Kinesis

2016-08-23 Thread Sameer W
Hi, The documentation says that there will be one thread per shard. If I my streaming job runs with a parallelism of 10 and there are 20 shards, are more threads going to be launched within a task slot running a source function to consume the additional shards or will one source function instance

Default timestamps for Event Time when no Watermark Assigner used?

2016-08-23 Thread Sameer W
Hi, If you do not explicitly assign timestamps and watermarks when using Event Time, does it automatically default to using Ingestion Time? I was reading the Kinesis integration section and came across the note below and which raised the above question. I saw another place where you explicitly us

FLINK-4329 fix version

2016-08-23 Thread Yassine Marzougui
Hi all, The fix version of FLINK-4329 in JIRA is set to 1.1.1, but 1.1.1 is already released. Should I expect it to be fixed in the next release? and will a patch be available meanwhile? Thanks. Yassine

Re: "Failed to retrieve JobManager address" in Flink 1.1.1 with JM HA

2016-08-23 Thread Ufuk Celebi
You are right that this config key is not needed in this case. The ClusterClient has been refactored between Flink 1.0 and 1.1 and the config parsing might be too strict in this case. It expects the IPC address to be set, which is not necessary as you say. It should be very easy to fix for 1.1.2.

Re: Batch jobs with a very large number of input splits

2016-08-23 Thread Fabian Hueske
Hi Niels, yes, in YARN mode, the default parallelism is the number of available slots. You can change the default task parallelism like this: 1) Use the -p parameter when submitting a job via the CLI client [1] 2) Set a parallelism on the execution environment: env.setParallelism() Best, Fabian

Re: Batch jobs with a very large number of input splits

2016-08-23 Thread Niels Basjes
I did more digging and finally understand what goes wrong. I create a yarn-session with 50 slots. Then I run my job that (due to the fact that my HBase table has 100s of regions) has a lot of inputsplits. The job then runs with parallelism 50 because I did not specify the value. As a consequence th