Re: Optimal Configuration for Cluster

2016-02-23 Thread Welly Tambunan
Hi Ufuk,

Thanks for the explanation.

Yes. Our jobs is all streaming job.

Cheers

On Tue, Feb 23, 2016 at 2:48 PM, Ufuk Celebi  wrote:

> The new default is equivalent to the previous "streaming mode". The
> community decided to get rid of this distinction, because it was
> confusing to users.
>
> The difference between "streaming mode" and "batch mode" was how
> Flink's managed memory was allocated, either lazily when required
> ('streaming mode") or eagerly on task manager start up ("batch mode").
> Now it's lazy by default.
>
> This is not something you need to worry about, but if you are mostly
> using the DataSet API where pre allocation has benefits, you can get
> the "batch mode" behaviour by using the following configuration key:
>
> taskmanager.memory.preallocate: true
>
> But you are using the DataStream API anyways, right?
>
> – Ufuk
>
>
> On Tue, Feb 23, 2016 at 6:36 AM, Welly Tambunan  wrote:
> > Hi Fabian,
> >
> > Previously when using flink 0.9-0.10 we start the cluster with streaming
> > mode or batch mode. I see that this one is gone on Flink 1.00 snapshot ?
> So
> > this one has already taken care of the flink and optimize by runtime >
> >
> > On Mon, Feb 22, 2016 at 5:26 PM, Fabian Hueske 
> wrote:
> >>
> >> Hi Welly,
> >>
> >> sorry for the late response.
> >>
> >> The number of network buffers primarily depends on the maximum
> parallelism
> >> of your job.
> >> The given formula assumes a specific cluster configuration (1 task
> manager
> >> per machine, one parallel task per CPU).
> >> The formula can be translated to:
> >>
> >> taskmanager.network.numberOfBuffers: p ^ 2 * t * 4
> >>
> >> where p is the maximum parallelism of the job and t is the number of
> task
> >> manager.
> >> You can process more than one parallel task per TM if you configure more
> >> than one processing slot per machine ( taskmanager.numberOfTaskSlots).
> The
> >> TM will divide its memory among all its slots. So it would be possible
> to
> >> start one TM for each machine with 100GB+ memory and 48 slots each.
> >>
> >> We can compute the number of network buffers if you give a few more
> >> details about your setup:
> >> - How many task managers do you start? I assume more than one TM per
> >> machine given that you assign only 4GB of memory out of 128GB to each
> TM.
> >> - What is the maximum parallelism of you program?
> >> - How many processing slots do you configure for each TM?
> >>
> >> In general, pipelined shuffles with a high parallelism require a lot of
> >> memory.
> >> If you configure batch instead of pipelined transfer, the memory
> >> requirement goes down
> >> (ExecutionConfig.setExecutionMode(ExecutionMode.BATCH)).
> >>
> >> Eventually, we want to merge the network buffer and the managed memory
> >> pools. So the "taskmanager.network.numberOfBuffers" configuration whill
> >> hopefully disappear at some point in the future.
> >>
> >> Best, Fabian
> >>
> >> 2016-02-19 9:34 GMT+01:00 Welly Tambunan :
> >>>
> >>> Hi All,
> >>>
> >>> We are trying to running our job in cluster that has this information
> >>>
> >>> 1. # of machine: 16
> >>> 2. memory : 128 gb
> >>> 3. # of core : 48
> >>>
> >>> However when we try to run we have an exception.
> >>>
> >>> "insufficient number of network buffers. 48 required but only 10
> >>> available. the total number of network buffers is currently set to
> 2048"
> >>>
> >>> After looking at the documentation we set configuration based on docs
> >>>
> >>> taskmanager.network.numberOfBuffers: # core ^ 2 * # machine * 4
> >>>
> >>> However we face another error from JVM
> >>>
> >>> java.io.IOException: Cannot allocate network buffer pool: Could not
> >>> allocate enough memory segments for NetworkBufferPool (required (Mb):
> 2304,
> >>> allocated (Mb): 698, missing (Mb): 1606). Cause: Java heap space
> >>>
> >>> We fiddle the taskmanager.heap.mb: 4096
> >>>
> >>> Finally the cluster is running.
> >>>
> >>> However i'm still not sure about the configuration and fiddling in task
> >>> manager heap really fine tune. So my question is
> >>>
> >>> Am i doing it right for numberOfBuffers ?
> >>> How much should we allocate on taskmanager.heap.mb given the
> information
> >>> Any suggestion which configuration we need to set to make it optimal
> for
> >>> the cluster ?
> >>> Is there any chance that this will get automatically resolve by
> >>> memory/network buffer manager ?
> >>>
> >>> Thanks a lot for the help
> >>>
> >>> Cheers
> >>>
> >>> --
> >>> Welly Tambunan
> >>> Triplelands
> >>>
> >>> http://weltam.wordpress.com
> >>> http://www.triplelands.com
> >>
> >>
> >
> >
> >
> > --
> > Welly Tambunan
> > Triplelands
> >
> > http://weltam.wordpress.com
> > http://www.triplelands.com
>



-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Re: Use jvm to run flink on single-node machine with many cores

2016-02-23 Thread Ana M. Martinez
Hi all,

Thank you very much for your help. It worked perfectly like this:


Configuration conf = new Configuration();
conf.setInteger("taskmanager.network.numberOfBuffers", 16000);
conf.setInteger("taskmanager.numberOfTaskSlots”,32);
final ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment(conf);

env.setParallelism(32);

I believe that setting taskmanager.numberOfTaskSlots is not necessary, but 
setParallelism is, as by default 1 was taken.

Best regards,
Ana

On 22 Feb 2016, at 10:37, Ufuk Celebi mailto:u...@apache.org>> 
wrote:

Note that the method to call in the example should be
`conf.setInteger` and the second argument not a String but an int.

On Sun, Feb 21, 2016 at 1:41 PM, Márton Balassi
mailto:balassi.mar...@gmail.com>> wrote:
Dear Ana,

If you are using a single machine with multiple cores, but need convenient
access to the configuration I would personally recommend using the local
cluster option in the flink distribution. [1] If you want to avoid having a
flink distro on the machine, then Robert's solution is the way to go.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/setup/local_setup.html

On Sun, Feb 21, 2016 at 1:34 PM, Ana M. Martinez  wrote:

Hi all,

I am trying to run a program using the flink java library with
ExecutionEnvironment.getExecutionEnvironment() from the command line using
java -jar.

If I run the code in my machine (with four cores) or in a multi-node
cluster (using yarn) the program runs normally, but if I want to run it on a
machine with a single node and 32 cores using java -jar I get the following
error:

02/21/2016 13:33:09 MapPartition (MapPartition at
toBatches(ConversionToBatches.java:55))(29/32) switched to FAILED
java.io.IOException: Insufficient number of network buffers: required 1,
but only 0 available. The total number of network buffers is currently set
to 2048. You can increase this number by setting the configuration key
'taskmanager.network.numberOfBuffers'.
at
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196)
at
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:325)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:488)
at java.lang.Thread.run(Thread.java:745)

In this case (java -jar), I don’t know if or how I can increase the number
of network buffers. Is there a way to do it without having to use yarn (as I
don’t have hadoop installed)?

Thanks,
Ana





Re: Problem with Kafka 0.9 Client

2016-02-23 Thread Lopez, Javier
Hi Robert,

After we restarted our Kafka / Zookeeper cluster the consumer worked. Some
of our topics had some problems. The flink's consumer for Kafka 0.9 works
as expected.

Thanks!

On 19 February 2016 at 12:03, Lopez, Javier  wrote:

> Hi, these are the properties:
>
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers",
> ".87:9092,.41:9092,.35:9092"); //full IPs removed for security reasons
> properties.setProperty("zookeeper.connect", ".37:2181");
> properties.setProperty("group.id", "test");
> properties.setProperty("client.id", "flink_test");
> properties.setProperty("auto.offset.reset", "earliest");
> properties.put("enable.auto.commit", "true");
> properties.put("auto.commit.interval.ms", "1000");
> properties.put("session.timeout.ms", "3");
>
> We have tested with these as well:
>
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers",
> ".87:9092,.41:9092,.35:9092");
> properties.setProperty("zookeeper.connect", ".37:2181");
> properties.setProperty("group.id", "test");
> properties.setProperty("client.id", "flink_test");
> properties.setProperty("auto.offset.reset", "earliest");
>
>
> and these:
>
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers",
> ".87:9092,.41:9092,.35:9092");
> properties.setProperty("zookeeper.connect", ".37:2181");
> properties.setProperty("group.id", "test");
> properties.setProperty("client.id", "flink_test");
> properties.setProperty("auto.offset.reset", "earliest");
> properties.put("enable.auto.commit", "true");
> properties.put("auto.commit.interval.ms", "1000");
> properties.put("session.timeout.ms", "3");
> properties.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
> properties.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>
> With all three different configurations we get the same result.
>
> On 19 February 2016 at 11:55, Robert Metzger  wrote:
>
>> Thank you. Can you send me also the list of properties you are passing to
>> the kafka consumer? Are you only setting the "bootstrap.servers" or more?
>>
>> On Fri, Feb 19, 2016 at 11:46 AM, Lopez, Javier 
>> wrote:
>>
>>> Hi Robert,
>>>
>>> Please find attached the full logs of one of our latest executions. We
>>> are basically trying to read from our kafka cluster and then writing the
>>> data to elasticsearch.
>>>
>>> Thanks for your help!
>>>
>>> On 18 February 2016 at 11:19, Robert Metzger 
>>> wrote:
>>>
 Hi Javier,

 sorry for the late response. In the Error Mapping of Kafka, it says
 that code 15 means: ConsumerCoordinatorNotAvailableCode.

 https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/common/ErrorMapping.scala

 How many brokers did you put into the list of bootstrap servers?
 Can you maybe send me the full log of one of the Flink TaskManagers
 reading from Kafka?


 On Wed, Feb 17, 2016 at 11:10 AM, Lopez, Javier <
 javier.lo...@zalando.de> wrote:

> Hi guys,
>
> We are using Flink 1.0-SNAPSHOT with Kafka 0.9 Consumer and we have
> not been able to retrieve data from our Kafka Cluster. The DEBUG data
> reports the following:
>
> 10:53:24,365 DEBUG org.apache.kafka.clients.NetworkClient
>- Sending metadata request ClientRequest(expectResponse=true,
> callback=null,
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=1673,client_id=flink_test},
> body={topics=[stream_test_3]}), isInitiatedByNetworkClient,
> createdTimeMs=1455702804364, sendTimeMs=0) to node 35
> 10:53:24,398 DEBUG org.apache.kafka.clients.Metadata
>   - Updated cluster metadata version 838 to Cluster(nodes =
> [Node(41, ip-.eu-west-1.compute.internal, 9092), Node(35,
> ip-.eu-west-1.compute.internal, 9092), Node(87,
> ip-.eu-west-1.compute.internal, 9092)], partitions = [Partition(topic 
> =
> stream_test_3, partition = 0, leader = 87, replicas = [87,41,35,], isr =
> [87,41,35,], Partition(topic = stream_test_3, partition = 1, leader = 35,
> replicas = [35,41,87,], isr = [35,41,87,], Partition(topic = 
> stream_test_3,
> partition = 4, leader = 87, replicas = [87,41,35,], isr = [87,41,35,],
> Partition(topic = stream_test_3, partition = 3, leader = 35, replicas =
> [35,87,41,], isr = [35,87,41,], Partition(topic = stream_test_3, partition
> = 2, leader = 41, replicas = [41,87,35,], isr = [41,87,35,]])
> 10:53:24,398 DEBUG
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Issuing
> group metadata request to broker 35
> 10:53:24,432 DEBUG
> org.apache.kafka.clients.consumer.internals.Abstrac

Re: Use jvm to run flink on single-node machine with many cores

2016-02-23 Thread Ufuk Celebi
On Tue, Feb 23, 2016 at 10:17 AM, Ana M. Martinez  wrote:
> I believe that setting taskmanager.numberOfTaskSlots is not necessary, but
> setParallelism is, as by default 1 was taken.

Yes, the number of slots in local execution defaults to the maximum
parallelism of the job.


Re: Problem with Kafka 0.9 Client

2016-02-23 Thread Robert Metzger
Great. That's good news. Let us know if you encounter more issues with the
Kafka connector.

By the way, Kafka released 0.9.0.1, maybe updating your brokers to that
version resolves the issues? (Maybe the problems of some of the topics were
caused by bugs in Kafka)

On Tue, Feb 23, 2016 at 10:23 AM, Lopez, Javier 
wrote:

> Hi Robert,
>
> After we restarted our Kafka / Zookeeper cluster the consumer worked. Some
> of our topics had some problems. The flink's consumer for Kafka 0.9 works
> as expected.
>
> Thanks!
>
> On 19 February 2016 at 12:03, Lopez, Javier 
> wrote:
>
>> Hi, these are the properties:
>>
>> Properties properties = new Properties();
>> properties.setProperty("bootstrap.servers",
>> ".87:9092,.41:9092,.35:9092"); //full IPs removed for security reasons
>> properties.setProperty("zookeeper.connect", ".37:2181");
>> properties.setProperty("group.id", "test");
>> properties.setProperty("client.id", "flink_test");
>> properties.setProperty("auto.offset.reset", "earliest");
>> properties.put("enable.auto.commit", "true");
>> properties.put("auto.commit.interval.ms", "1000");
>> properties.put("session.timeout.ms", "3");
>>
>> We have tested with these as well:
>>
>> Properties properties = new Properties();
>> properties.setProperty("bootstrap.servers",
>> ".87:9092,.41:9092,.35:9092");
>> properties.setProperty("zookeeper.connect", ".37:2181");
>> properties.setProperty("group.id", "test");
>> properties.setProperty("client.id", "flink_test");
>> properties.setProperty("auto.offset.reset", "earliest");
>>
>>
>> and these:
>>
>> Properties properties = new Properties();
>> properties.setProperty("bootstrap.servers",
>> ".87:9092,.41:9092,.35:9092");
>> properties.setProperty("zookeeper.connect", ".37:2181");
>> properties.setProperty("group.id", "test");
>> properties.setProperty("client.id", "flink_test");
>> properties.setProperty("auto.offset.reset", "earliest");
>> properties.put("enable.auto.commit", "true");
>> properties.put("auto.commit.interval.ms", "1000");
>> properties.put("session.timeout.ms", "3");
>> properties.put("key.deserializer",
>> "org.apache.kafka.common.serialization.StringDeserializer");
>> properties.put("value.deserializer",
>> "org.apache.kafka.common.serialization.StringDeserializer");
>>
>> With all three different configurations we get the same result.
>>
>> On 19 February 2016 at 11:55, Robert Metzger  wrote:
>>
>>> Thank you. Can you send me also the list of properties you are passing
>>> to the kafka consumer? Are you only setting the "bootstrap.servers" or more?
>>>
>>> On Fri, Feb 19, 2016 at 11:46 AM, Lopez, Javier >> > wrote:
>>>
 Hi Robert,

 Please find attached the full logs of one of our latest executions. We
 are basically trying to read from our kafka cluster and then writing the
 data to elasticsearch.

 Thanks for your help!

 On 18 February 2016 at 11:19, Robert Metzger 
 wrote:

> Hi Javier,
>
> sorry for the late response. In the Error Mapping of Kafka, it says
> that code 15 means: ConsumerCoordinatorNotAvailableCode.
>
> https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/common/ErrorMapping.scala
>
> How many brokers did you put into the list of bootstrap servers?
> Can you maybe send me the full log of one of the Flink TaskManagers
> reading from Kafka?
>
>
> On Wed, Feb 17, 2016 at 11:10 AM, Lopez, Javier <
> javier.lo...@zalando.de> wrote:
>
>> Hi guys,
>>
>> We are using Flink 1.0-SNAPSHOT with Kafka 0.9 Consumer and we have
>> not been able to retrieve data from our Kafka Cluster. The DEBUG data
>> reports the following:
>>
>> 10:53:24,365 DEBUG org.apache.kafka.clients.NetworkClient
>>- Sending metadata request ClientRequest(expectResponse=true,
>> callback=null,
>> request=RequestSend(header={api_key=3,api_version=0,correlation_id=1673,client_id=flink_test},
>> body={topics=[stream_test_3]}), isInitiatedByNetworkClient,
>> createdTimeMs=1455702804364, sendTimeMs=0) to node 35
>> 10:53:24,398 DEBUG org.apache.kafka.clients.Metadata
>> - Updated cluster metadata version 838 to Cluster(nodes =
>> [Node(41, ip-.eu-west-1.compute.internal, 9092), Node(35,
>> ip-.eu-west-1.compute.internal, 9092), Node(87,
>> ip-.eu-west-1.compute.internal, 9092)], partitions = 
>> [Partition(topic =
>> stream_test_3, partition = 0, leader = 87, replicas = [87,41,35,], isr =
>> [87,41,35,], Partition(topic = stream_test_3, partition = 1, leader = 35,
>> replicas = [35,41,87,], isr = [35,41,87,], Partition(topic = 
>> stream_test_3,
>> partition = 4, leader = 87, replicas = [87,41,35,], isr = [87,41,35,],
>> Partit

Re: Optimal Configuration for Cluster

2016-02-23 Thread Welly Tambunan
Hi Ufuk and Fabian,

Is that better to start 48 task manager ( one slot each ) in one machine
than having single task manager with 48 slot ? Any trade-off that we should
know etc ?

Cheers

On Tue, Feb 23, 2016 at 3:03 PM, Welly Tambunan  wrote:

> Hi Ufuk,
>
> Thanks for the explanation.
>
> Yes. Our jobs is all streaming job.
>
> Cheers
>
> On Tue, Feb 23, 2016 at 2:48 PM, Ufuk Celebi  wrote:
>
>> The new default is equivalent to the previous "streaming mode". The
>> community decided to get rid of this distinction, because it was
>> confusing to users.
>>
>> The difference between "streaming mode" and "batch mode" was how
>> Flink's managed memory was allocated, either lazily when required
>> ('streaming mode") or eagerly on task manager start up ("batch mode").
>> Now it's lazy by default.
>>
>> This is not something you need to worry about, but if you are mostly
>> using the DataSet API where pre allocation has benefits, you can get
>> the "batch mode" behaviour by using the following configuration key:
>>
>> taskmanager.memory.preallocate: true
>>
>> But you are using the DataStream API anyways, right?
>>
>> – Ufuk
>>
>>
>> On Tue, Feb 23, 2016 at 6:36 AM, Welly Tambunan 
>> wrote:
>> > Hi Fabian,
>> >
>> > Previously when using flink 0.9-0.10 we start the cluster with streaming
>> > mode or batch mode. I see that this one is gone on Flink 1.00 snapshot
>> ? So
>> > this one has already taken care of the flink and optimize by runtime >
>> >
>> > On Mon, Feb 22, 2016 at 5:26 PM, Fabian Hueske 
>> wrote:
>> >>
>> >> Hi Welly,
>> >>
>> >> sorry for the late response.
>> >>
>> >> The number of network buffers primarily depends on the maximum
>> parallelism
>> >> of your job.
>> >> The given formula assumes a specific cluster configuration (1 task
>> manager
>> >> per machine, one parallel task per CPU).
>> >> The formula can be translated to:
>> >>
>> >> taskmanager.network.numberOfBuffers: p ^ 2 * t * 4
>> >>
>> >> where p is the maximum parallelism of the job and t is the number of
>> task
>> >> manager.
>> >> You can process more than one parallel task per TM if you configure
>> more
>> >> than one processing slot per machine ( taskmanager.numberOfTaskSlots).
>> The
>> >> TM will divide its memory among all its slots. So it would be possible
>> to
>> >> start one TM for each machine with 100GB+ memory and 48 slots each.
>> >>
>> >> We can compute the number of network buffers if you give a few more
>> >> details about your setup:
>> >> - How many task managers do you start? I assume more than one TM per
>> >> machine given that you assign only 4GB of memory out of 128GB to each
>> TM.
>> >> - What is the maximum parallelism of you program?
>> >> - How many processing slots do you configure for each TM?
>> >>
>> >> In general, pipelined shuffles with a high parallelism require a lot of
>> >> memory.
>> >> If you configure batch instead of pipelined transfer, the memory
>> >> requirement goes down
>> >> (ExecutionConfig.setExecutionMode(ExecutionMode.BATCH)).
>> >>
>> >> Eventually, we want to merge the network buffer and the managed memory
>> >> pools. So the "taskmanager.network.numberOfBuffers" configuration whill
>> >> hopefully disappear at some point in the future.
>> >>
>> >> Best, Fabian
>> >>
>> >> 2016-02-19 9:34 GMT+01:00 Welly Tambunan :
>> >>>
>> >>> Hi All,
>> >>>
>> >>> We are trying to running our job in cluster that has this information
>> >>>
>> >>> 1. # of machine: 16
>> >>> 2. memory : 128 gb
>> >>> 3. # of core : 48
>> >>>
>> >>> However when we try to run we have an exception.
>> >>>
>> >>> "insufficient number of network buffers. 48 required but only 10
>> >>> available. the total number of network buffers is currently set to
>> 2048"
>> >>>
>> >>> After looking at the documentation we set configuration based on docs
>> >>>
>> >>> taskmanager.network.numberOfBuffers: # core ^ 2 * # machine * 4
>> >>>
>> >>> However we face another error from JVM
>> >>>
>> >>> java.io.IOException: Cannot allocate network buffer pool: Could not
>> >>> allocate enough memory segments for NetworkBufferPool (required (Mb):
>> 2304,
>> >>> allocated (Mb): 698, missing (Mb): 1606). Cause: Java heap space
>> >>>
>> >>> We fiddle the taskmanager.heap.mb: 4096
>> >>>
>> >>> Finally the cluster is running.
>> >>>
>> >>> However i'm still not sure about the configuration and fiddling in
>> task
>> >>> manager heap really fine tune. So my question is
>> >>>
>> >>> Am i doing it right for numberOfBuffers ?
>> >>> How much should we allocate on taskmanager.heap.mb given the
>> information
>> >>> Any suggestion which configuration we need to set to make it optimal
>> for
>> >>> the cluster ?
>> >>> Is there any chance that this will get automatically resolve by
>> >>> memory/network buffer manager ?
>> >>>
>> >>> Thanks a lot for the help
>> >>>
>> >>> Cheers
>> >>>
>> >>> --
>> >>> Welly Tambunan
>> >>> Triplelands
>> >>>
>> >>> http://weltam.wordpress.com
>> >>> http://www.triplelands.com
>> >>
>> >>
>> >
>> >

Re: streaming hdfs sub folders

2016-02-23 Thread Martin Neumann
I'm not very familiar with the inner workings of the InputFomat's. calling
.open() got rid of the Nullpointer but the stream still produces no output.

As a temporary solution I wrote a batch job that just unions all the
different datasets and puts them (sorted) into a single folder.

cheers Martin

On Fri, Feb 19, 2016 at 2:39 PM, Robert Metzger  wrote:

> Hi Martin,
>
> where is the null pointer exception thrown?
> I think you didn't call the open() method of the AvroInputFormat. Maybe
> that's the issue.
>
> On Thu, Feb 18, 2016 at 5:01 PM, Martin Neumann  wrote:
>
>> I tried to implement your idea but I'm getting NullPointer exceptions
>> from the AvroInputFormat any Idea what I'm doing wrong?
>> See the code below:
>>
>> public static void main(String[] args) throws Exception {
>>
>> // set up the execution environment
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setParallelism(1);
>>
>> env.fromElements("00", "01", "02","03","22","23")
>> .flatMap(new FileExtractor())
>> .filter(new LocationFiter())
>> .flatMap(new PreProcessEndSongClean())
>> .writeAsCsv(outPath);
>>
>>
>> env.execute("something");
>> }
>>
>> private static class FileExtractor implements 
>> FlatMapFunction{
>>
>> @Override
>> public void flatMap(String s, Collector collector) 
>> throws Exception {
>> AvroInputFormat avroInputFormat = new 
>> AvroInputFormat(new 
>> Path("hdfs:///anonym/cleaned/endsong/2016-01-01/"+s), 
>> EndSongCleanedPq.class);
>> avroInputFormat.setReuseAvroValue(false);
>> while (! avroInputFormat.reachedEnd()){
>> EndSongCleanedPq res = avroInputFormat.nextRecord(new 
>> EndSongCleanedPq());
>> if (res != null) collector.collect(res);
>> }
>> }
>> }
>>
>>
>> On Thu, Feb 18, 2016 at 4:06 PM, Martin Neumann  wrote:
>>
>>> I guess I need to set the parallelism for the FlatMap to 1 to make sure
>>> I read one file at a time. The downside I see with this is that I will be
>>> not able to read in parallel from HDFS (and the files are Huge).
>>>
>>> I give it a try and see how much performance I loose.
>>>
>>> cheers Martin
>>>
>>> On Thu, Feb 18, 2016 at 2:32 PM, Stephan Ewen  wrote:
>>>
 Martin,

 I think you can approximate this in an easy way like this:

   - On the client, you traverse your directories to collect all files
 that you need, collect all file paths in a list.
   - Then you have a source "env.fromElements(paths)".
   - Then you flatMap and in the FlatMap, run the Avro input format
 (open it per path, then call it to get all elements)

 That gives you pretty much full control about in which order the files
 should be processed.

 What do you think?

 Stephan


 On Wed, Feb 17, 2016 at 9:42 PM, Martin Neumann 
 wrote:

> I forgot to mention I'm using an AvroInputFormat to read the file
> (that might be relevant how the flag needs to be applied)
> See the code Snipped below:
>
> DataStream inStream =
> env.readFile(new AvroInputFormat(new 
> Path(filePath), EndSongCleanedPq.class), filePath);
>
>
> On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann 
> wrote:
>
>> The program is a DataStream program, it usually it gets the data from
>> kafka. It's an anomaly detection program that learns from the stream
>> itself. The reason I want to read from files is to test different 
>> settings
>> of the algorithm and compare them.
>>
>> I think I don't need to reply things in the exact order (wich is not
>> possible with parallel reads anyway) and I have written the program so it
>> can deal with out of order events.
>> I only need the subfolders to be processed roughly in order. Its fine
>> to process some stuff from 01 before everything from 00 is finished, if I
>> get records from all 24 subfolders at the same time things will break
>> though. If I set the flag will it try to get data from all sub dir's in
>> parallel or will it go sub dir by sub dir?
>>
>> Also can you point me to some documentation or something where I can
>> see how to set the Flag?
>>
>> cheers Martin
>>
>>
>>
>>
>> On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen 
>> wrote:
>>
>>> Hi!
>>>
>>> Going through nested folders is pretty simple, there is a flag on
>>> the FileInputFormat that makes sure those are read.
>>>
>>> Tricky is the part that all "00" files should be read before the
>>> "01" files. If you still want parallel reads, that means you need to 
>>> sync
>>> at some point, wait for all parallel parts to finish with the "00" work
>>> before anyone may start with the "01" work.
>>>
>>> Is your training program a DataStream or a DataSet program?`

Best way to process data in many files? (FLINK-BATCH)

2016-02-23 Thread Tim Conrad



Dear FLINK community.

I was wondering what would be the recommended (best?) way to achieve 
some kind of file conversion. That runs in parallel on all available 
Flink Nodes, since it it "embarrassingly parallel" (no dependency 
between files).


Say, I have a HDFS folder that contains multiple structured text-files 
containing (x,y) pairs (think of CVS).


For each of these files I want to do (individual for each file) the 
following:


* Read file from HDFS
* Extract dataset(s) from file (e.g. list of (x,y) pairs)
* Apply some filter (e.g. smoothing)
* Do some pattern recognition on smoothed data
* Write results back to HDFS (different format)

Would the following be a good idea?

DataSource fileList = ... // contains list of file names in HDFS

// For each "filename" in list do...
DataSet featureList = fileList
.flatMap(new ReadDataSetFromFile()) // flatMap because 
there might multiple DataSets in a file

.map(new Smoothing())
.map(new FindPatterns());

featureList.writeAsFormattedText( ... )

I have the feeling that Flink does not distribute the independent tasks 
on the available nodes but executes everything on only one node.



Cheers
Tim





Re: Optimal Configuration for Cluster

2016-02-23 Thread Ufuk Celebi
I would go with one task manager with 48 slots per machine. This
reduces the communication overheads between task managers.

Regarding memory configuration: Given that the machines have plenty of
memory, I would configure a bigger heap than the 4 GB you had
previously. Furhermore, you can also consider adding more network
buffers, which should improve job throughput.

– Ufuk

On Tue, Feb 23, 2016 at 11:57 AM, Welly Tambunan  wrote:
> Hi Ufuk and Fabian,
>
> Is that better to start 48 task manager ( one slot each ) in one machine
> than having single task manager with 48 slot ? Any trade-off that we should
> know etc ?
>
> Cheers
>
> On Tue, Feb 23, 2016 at 3:03 PM, Welly Tambunan  wrote:
>>
>> Hi Ufuk,
>>
>> Thanks for the explanation.
>>
>> Yes. Our jobs is all streaming job.
>>
>> Cheers
>>
>> On Tue, Feb 23, 2016 at 2:48 PM, Ufuk Celebi  wrote:
>>>
>>> The new default is equivalent to the previous "streaming mode". The
>>> community decided to get rid of this distinction, because it was
>>> confusing to users.
>>>
>>> The difference between "streaming mode" and "batch mode" was how
>>> Flink's managed memory was allocated, either lazily when required
>>> ('streaming mode") or eagerly on task manager start up ("batch mode").
>>> Now it's lazy by default.
>>>
>>> This is not something you need to worry about, but if you are mostly
>>> using the DataSet API where pre allocation has benefits, you can get
>>> the "batch mode" behaviour by using the following configuration key:
>>>
>>> taskmanager.memory.preallocate: true
>>>
>>> But you are using the DataStream API anyways, right?
>>>
>>> – Ufuk
>>>
>>>
>>> On Tue, Feb 23, 2016 at 6:36 AM, Welly Tambunan 
>>> wrote:
>>> > Hi Fabian,
>>> >
>>> > Previously when using flink 0.9-0.10 we start the cluster with
>>> > streaming
>>> > mode or batch mode. I see that this one is gone on Flink 1.00 snapshot
>>> > ? So
>>> > this one has already taken care of the flink and optimize by runtime >
>>> >
>>> > On Mon, Feb 22, 2016 at 5:26 PM, Fabian Hueske 
>>> > wrote:
>>> >>
>>> >> Hi Welly,
>>> >>
>>> >> sorry for the late response.
>>> >>
>>> >> The number of network buffers primarily depends on the maximum
>>> >> parallelism
>>> >> of your job.
>>> >> The given formula assumes a specific cluster configuration (1 task
>>> >> manager
>>> >> per machine, one parallel task per CPU).
>>> >> The formula can be translated to:
>>> >>
>>> >> taskmanager.network.numberOfBuffers: p ^ 2 * t * 4
>>> >>
>>> >> where p is the maximum parallelism of the job and t is the number of
>>> >> task
>>> >> manager.
>>> >> You can process more than one parallel task per TM if you configure
>>> >> more
>>> >> than one processing slot per machine ( taskmanager.numberOfTaskSlots).
>>> >> The
>>> >> TM will divide its memory among all its slots. So it would be possible
>>> >> to
>>> >> start one TM for each machine with 100GB+ memory and 48 slots each.
>>> >>
>>> >> We can compute the number of network buffers if you give a few more
>>> >> details about your setup:
>>> >> - How many task managers do you start? I assume more than one TM per
>>> >> machine given that you assign only 4GB of memory out of 128GB to each
>>> >> TM.
>>> >> - What is the maximum parallelism of you program?
>>> >> - How many processing slots do you configure for each TM?
>>> >>
>>> >> In general, pipelined shuffles with a high parallelism require a lot
>>> >> of
>>> >> memory.
>>> >> If you configure batch instead of pipelined transfer, the memory
>>> >> requirement goes down
>>> >> (ExecutionConfig.setExecutionMode(ExecutionMode.BATCH)).
>>> >>
>>> >> Eventually, we want to merge the network buffer and the managed memory
>>> >> pools. So the "taskmanager.network.numberOfBuffers" configuration
>>> >> whill
>>> >> hopefully disappear at some point in the future.
>>> >>
>>> >> Best, Fabian
>>> >>
>>> >> 2016-02-19 9:34 GMT+01:00 Welly Tambunan :
>>> >>>
>>> >>> Hi All,
>>> >>>
>>> >>> We are trying to running our job in cluster that has this information
>>> >>>
>>> >>> 1. # of machine: 16
>>> >>> 2. memory : 128 gb
>>> >>> 3. # of core : 48
>>> >>>
>>> >>> However when we try to run we have an exception.
>>> >>>
>>> >>> "insufficient number of network buffers. 48 required but only 10
>>> >>> available. the total number of network buffers is currently set to
>>> >>> 2048"
>>> >>>
>>> >>> After looking at the documentation we set configuration based on docs
>>> >>>
>>> >>> taskmanager.network.numberOfBuffers: # core ^ 2 * # machine * 4
>>> >>>
>>> >>> However we face another error from JVM
>>> >>>
>>> >>> java.io.IOException: Cannot allocate network buffer pool: Could not
>>> >>> allocate enough memory segments for NetworkBufferPool (required (Mb):
>>> >>> 2304,
>>> >>> allocated (Mb): 698, missing (Mb): 1606). Cause: Java heap space
>>> >>>
>>> >>> We fiddle the taskmanager.heap.mb: 4096
>>> >>>
>>> >>> Finally the cluster is running.
>>> >>>
>>> >>> However i'm still not sure about the configuration and fiddling in
>>> >>> 

Re: Best way to process data in many files? (FLINK-BATCH)

2016-02-23 Thread Till Rohrmann
Hi Tim,

depending on how you create the DataSource fileList, Flink will
schedule the downstream operators differently. If you used the
ExecutionEnvironment.fromCollection method, then it will create a DataSource
with a CollectionInputFormat. This kind of DataSource will only be executed
with a degree of parallelism of 1. The source will send it’s collection
elements in a round robin fashion to the downstream operators which are
executed with a higher parallelism. So when Flink schedules the downstream
operators, it will try to place them close to their inputs. Since all flat
map operators have the single data source task as an input, they will be
deployed on the same machine if possible.

In contrast, if you had a parallel data source which would consist of
multiple source task, then these tasks would be independent and spread out
across your cluster. In this case, every flat map task would have a single
distinct source task as input. When the flat map tasks are deployed they
would be deployed on the machine where their corresponding source is
running. Since the source tasks are spread out across the cluster, the flat
map tasks would be spread out as well.

What you could do to mitigate your problem is to start the cluster with as
many slots as your maximum degree of parallelism is. That way, you’ll
utilize all cluster resources.

I hope this clarifies a bit why you observe that tasks tend to cluster on a
single machine.

Cheers,
Till
​

On Tue, Feb 23, 2016 at 1:49 PM, Tim Conrad 
wrote:

>
>
> Dear FLINK community.
>
> I was wondering what would be the recommended (best?) way to achieve some
> kind of file conversion. That runs in parallel on all available Flink
> Nodes, since it it "embarrassingly parallel" (no dependency between files).
>
> Say, I have a HDFS folder that contains multiple structured text-files
> containing (x,y) pairs (think of CVS).
>
> For each of these files I want to do (individual for each file) the
> following:
>
> * Read file from HDFS
> * Extract dataset(s) from file (e.g. list of (x,y) pairs)
> * Apply some filter (e.g. smoothing)
> * Do some pattern recognition on smoothed data
> * Write results back to HDFS (different format)
>
> Would the following be a good idea?
>
> DataSource fileList = ... // contains list of file names in HDFS
>
> // For each "filename" in list do...
> DataSet featureList = fileList
> .flatMap(new ReadDataSetFromFile()) // flatMap because
> there might multiple DataSets in a file
> .map(new Smoothing())
> .map(new FindPatterns());
>
> featureList.writeAsFormattedText( ... )
>
> I have the feeling that Flink does not distribute the independent tasks on
> the available nodes but executes everything on only one node.
>
>
> Cheers
> Tim
>
>
>
>


Re: Dataset filter improvement

2016-02-23 Thread Maximilian Michels
Hi Flavio,

I think the point is that Flink can use its serialization tools if you
register the class in advance. If you don't do that, it will use Kryo
as a fall-back which is slightly less efficient.

Equals and hash code have to be implemented correctly if you compare
Pojos. For standard types like String or Integer, this is done
automatically. For Pojos, Flink doesn't know whether it is implemented
correctly or not. Every object in Java has a default equals and
hashCode implementation.

Cheers,
Max

On Wed, Feb 17, 2016 at 10:22 AM, Flavio Pompermaier
 wrote:
> Hi Max,
> why do I need to register them? My job runs without problem also without
> that.
> The only problem with my POJOs was that I had to implement equals and hash
> correctly, Flink didn't enforce me to do it but then results were wrong :(
>
>
>
> On Wed, Feb 17, 2016 at 10:16 AM Maximilian Michels  wrote:
>>
>> Hi Flavio,
>>
>> Stephan was referring to
>>
>> env.registerType(ExtendedClass1.class);
>> env.registerType(ExtendedClass2.class);
>>
>> Cheers,
>> Max
>>
>> On Wed, Feb 10, 2016 at 12:48 PM, Flavio Pompermaier
>>  wrote:
>> > What do you mean exactly..? Probably I'm missing something
>> > here..remember
>> > that I can specify the right subClass only after the last flatMap, after
>> > the
>> > first map neither me nor Flink can know the exact subclass of BaseClass
>> >
>> > On Wed, Feb 10, 2016 at 12:42 PM, Stephan Ewen  wrote:
>> >>
>> >> Class hierarchies should definitely work, even if the base class has no
>> >> fields.
>> >>
>> >> They work more efficiently if you register the subclasses at the
>> >> execution
>> >> environment (Flink cannot infer them from the function signatures
>> >> because
>> >> the function signatures only contain the abstract base class).
>> >>
>> >> On Wed, Feb 10, 2016 at 12:23 PM, Flavio Pompermaier
>> >>  wrote:
>> >>>
>> >>> Because The classes are not related to each other. Do you think it's a
>> >>> good idea to have something like this?
>> >>>
>> >>> abstract class BaseClass(){
>> >>>String someField;
>> >>> }
>> >>>
>> >>> class ExtendedClass1 extends BaseClass (){
>> >>>String someOtherField11;
>> >>>String someOtherField12;
>> >>>String someOtherField13;
>> >>>  ...
>> >>> }
>> >>>
>> >>> class ExtendedClass2 extends BaseClass (){
>> >>>Integer someOtherField21;
>> >>>Double someOtherField22;
>> >>>Integer someOtherField23;
>> >>>  ...
>> >>> }
>> >>>
>> >>> and then declare my map as Map. and then apply a
>> >>> flatMap that can be used to generated the specific datasets?
>> >>> Doesn't this cause problem to Flink? Classes can be vrry different to
>> >>> each other..maybe this can cause problems with the plan
>> >>> generation..isn't
>> >>> it?
>> >>>
>> >>> Thanks Fabian and Stephan for the support!
>> >>>
>> >>>
>> >>> On Wed, Feb 10, 2016 at 11:47 AM, Stephan Ewen 
>> >>> wrote:
>> 
>>  Why not use an abstract base class and N subclasses?
>> 
>>  On Wed, Feb 10, 2016 at 10:05 AM, Fabian Hueske 
>>  wrote:
>> >
>> > Unfortunately, there is no Either<1,...,n>.
>> > You could implement something like a Tuple3,
>> > Option, Option>. However, Flink does not provide an
>> > Option
>> > type (comes with Java8). You would need to implement it yourself
>> > incl.
>> > TypeInfo and Serializer. You can get some inspiration from the
>> > Either type
>> > info /serializer, if you want to go this way.
>> >
>> > Using a byte array would also work but doesn't look much easier than
>> > the Option approach to me.
>> >
>> > 2016-02-10 9:47 GMT+01:00 Flavio Pompermaier :
>> >>
>> >> Yes, the intermediate dataset I create then join again between
>> >> themselves. What I'd need is a Either<1,...,n>. Is that possible to
>> >> add?
>> >> Otherwise I was thinking to generate a Tuple2 and in
>> >> the subsequent filter+map/flatMap deserialize only those elements I
>> >> want to
>> >> group togheter (e.g. t.f0=="someEventType") in order to generate
>> >> the typed
>> >> dataset based.
>> >> Which one  do you think is the best solution?
>> >>
>> >> On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske 
>> >> wrote:
>> >>>
>> >>> Hi Flavio,
>> >>>
>> >>> I did not completely understand which objects should go where, but
>> >>> here are some general guidelines:
>> >>>
>> >>> - early filtering is mostly a good idea (unless evaluating the
>> >>> filter
>> >>> expression is very expensive)
>> >>> - you can use a flatMap function to combine a map and a filter
>> >>> - applying multiple functions on the same data set does not
>> >>> necessarily materialize the data set (in memory or on disk). In
>> >>> most cases
>> >>> it prevents chaining, hence there is serialization overhead. In
>> >>> some cases
>> >>> where the forked data streams are joined again, the data set must
>> >>> be
>> >>> material

Re: Best way to process data in many files? (FLINK-BATCH)

2016-02-23 Thread Tim Conrad

Hi Till (and others).

Thank you very much for your helpful answer.

On 23.02.2016 14:20, Till Rohrmann wrote:
[...] In contrast, if you had a parallel data source which would 
consist of multiple source task, then these tasks would be independent 
and spread out across your cluster [...]


Can you please send me a link to an example or to the respective Flink 
API doc, where I can see which is a parallel data source and how to 
create it with multiple source tasks?


A simple Google search did not provide me with an answer (maybe I used 
the wrong key words, though...).



Cheers
Tim




On 23.02.2016 14:20, Till Rohrmann wrote:


Hi Tim,

depending on how you create the |DataSource fileList|, Flink 
will schedule the downstream operators differently. If you used the 
|ExecutionEnvironment.fromCollection| method, then it will create a 
|DataSource| with a |CollectionInputFormat|. This kind of |DataSource| 
will only be executed with a degree of parallelism of 1. The source 
will send it’s collection elements in a round robin fashion to the 
downstream operators which are executed with a higher parallelism. So 
when Flink schedules the downstream operators, it will try to place 
them close to their inputs. Since all flat map operators have the 
single data source task as an input, they will be deployed on the same 
machine if possible.


In contrast, if you had a parallel data source which would consist of 
multiple source task, then these tasks would be independent and spread 
out across your cluster. In this case, every flat map task would have 
a single distinct source task as input. When the flat map tasks are 
deployed they would be deployed on the machine where their 
corresponding source is running. Since the source tasks are spread out 
across the cluster, the flat map tasks would be spread out as well.


What you could do to mitigate your problem is to start the cluster 
with as many slots as your maximum degree of parallelism is. That way, 
you’ll utilize all cluster resources.


I hope this clarifies a bit why you observe that tasks tend to cluster 
on a single machine.


Cheers,
Till

​





[no subject]

2016-02-23 Thread Zach Cox
Hi - I typically use the Chrome browser on OS X, and notice that with
1.0.0-rc0 the job graph visualization displays the nodes in the graph, but
not any of the edges. Also the graph does not move around when dragging the
mouse.

The job graph visualization seems to work perfectly in Safari and Firefox
on OS X, however.

Is this a known issue or should I open a jira ticket?

Thanks,
Zach


Re:

2016-02-23 Thread Ufuk Celebi
Hey Zach! I'm not aware of an open issue for this.

You can go ahead and open an issue for it. It will be very helpful to
include the following:
- exact Chrome and OS X version
- the exectuion plan as JSON (via env.getExecutionPlan())
- screenshot

Thanks!

– Ufuk


On Tue, Feb 23, 2016 at 3:46 PM, Zach Cox  wrote:
> Hi - I typically use the Chrome browser on OS X, and notice that with
> 1.0.0-rc0 the job graph visualization displays the nodes in the graph, but
> not any of the edges. Also the graph does not move around when dragging the
> mouse.
>
> The job graph visualization seems to work perfectly in Safari and Firefox on
> OS X, however.
>
> Is this a known issue or should I open a jira ticket?
>
> Thanks,
> Zach
>


Re: Dataset filter improvement

2016-02-23 Thread Till Rohrmann
Registering a data type is only relevant for the Kryo serializer or if you
want to serialize a subclass of a POJO. Registering has the advantage that
you assign an id to the class which is written instead of the full class
name. The latter is usually much longer than the id.

Cheers,
Till

On Tue, Feb 23, 2016 at 3:17 PM, Maximilian Michels  wrote:

> Hi Flavio,
>
> I think the point is that Flink can use its serialization tools if you
> register the class in advance. If you don't do that, it will use Kryo
> as a fall-back which is slightly less efficient.
>
> Equals and hash code have to be implemented correctly if you compare
> Pojos. For standard types like String or Integer, this is done
> automatically. For Pojos, Flink doesn't know whether it is implemented
> correctly or not. Every object in Java has a default equals and
> hashCode implementation.
>
> Cheers,
> Max
>
> On Wed, Feb 17, 2016 at 10:22 AM, Flavio Pompermaier
>  wrote:
> > Hi Max,
> > why do I need to register them? My job runs without problem also without
> > that.
> > The only problem with my POJOs was that I had to implement equals and
> hash
> > correctly, Flink didn't enforce me to do it but then results were wrong
> :(
> >
> >
> >
> > On Wed, Feb 17, 2016 at 10:16 AM Maximilian Michels 
> wrote:
> >>
> >> Hi Flavio,
> >>
> >> Stephan was referring to
> >>
> >> env.registerType(ExtendedClass1.class);
> >> env.registerType(ExtendedClass2.class);
> >>
> >> Cheers,
> >> Max
> >>
> >> On Wed, Feb 10, 2016 at 12:48 PM, Flavio Pompermaier
> >>  wrote:
> >> > What do you mean exactly..? Probably I'm missing something
> >> > here..remember
> >> > that I can specify the right subClass only after the last flatMap,
> after
> >> > the
> >> > first map neither me nor Flink can know the exact subclass of
> BaseClass
> >> >
> >> > On Wed, Feb 10, 2016 at 12:42 PM, Stephan Ewen 
> wrote:
> >> >>
> >> >> Class hierarchies should definitely work, even if the base class has
> no
> >> >> fields.
> >> >>
> >> >> They work more efficiently if you register the subclasses at the
> >> >> execution
> >> >> environment (Flink cannot infer them from the function signatures
> >> >> because
> >> >> the function signatures only contain the abstract base class).
> >> >>
> >> >> On Wed, Feb 10, 2016 at 12:23 PM, Flavio Pompermaier
> >> >>  wrote:
> >> >>>
> >> >>> Because The classes are not related to each other. Do you think
> it's a
> >> >>> good idea to have something like this?
> >> >>>
> >> >>> abstract class BaseClass(){
> >> >>>String someField;
> >> >>> }
> >> >>>
> >> >>> class ExtendedClass1 extends BaseClass (){
> >> >>>String someOtherField11;
> >> >>>String someOtherField12;
> >> >>>String someOtherField13;
> >> >>>  ...
> >> >>> }
> >> >>>
> >> >>> class ExtendedClass2 extends BaseClass (){
> >> >>>Integer someOtherField21;
> >> >>>Double someOtherField22;
> >> >>>Integer someOtherField23;
> >> >>>  ...
> >> >>> }
> >> >>>
> >> >>> and then declare my map as Map. and then apply a
> >> >>> flatMap that can be used to generated the specific datasets?
> >> >>> Doesn't this cause problem to Flink? Classes can be vrry different
> to
> >> >>> each other..maybe this can cause problems with the plan
> >> >>> generation..isn't
> >> >>> it?
> >> >>>
> >> >>> Thanks Fabian and Stephan for the support!
> >> >>>
> >> >>>
> >> >>> On Wed, Feb 10, 2016 at 11:47 AM, Stephan Ewen 
> >> >>> wrote:
> >> 
> >>  Why not use an abstract base class and N subclasses?
> >> 
> >>  On Wed, Feb 10, 2016 at 10:05 AM, Fabian Hueske  >
> >>  wrote:
> >> >
> >> > Unfortunately, there is no Either<1,...,n>.
> >> > You could implement something like a Tuple3,
> >> > Option, Option>. However, Flink does not provide an
> >> > Option
> >> > type (comes with Java8). You would need to implement it yourself
> >> > incl.
> >> > TypeInfo and Serializer. You can get some inspiration from the
> >> > Either type
> >> > info /serializer, if you want to go this way.
> >> >
> >> > Using a byte array would also work but doesn't look much easier
> than
> >> > the Option approach to me.
> >> >
> >> > 2016-02-10 9:47 GMT+01:00 Flavio Pompermaier <
> pomperma...@okkam.it>:
> >> >>
> >> >> Yes, the intermediate dataset I create then join again between
> >> >> themselves. What I'd need is a Either<1,...,n>. Is that possible
> to
> >> >> add?
> >> >> Otherwise I was thinking to generate a Tuple2 and
> in
> >> >> the subsequent filter+map/flatMap deserialize only those
> elements I
> >> >> want to
> >> >> group togheter (e.g. t.f0=="someEventType") in order to generate
> >> >> the typed
> >> >> dataset based.
> >> >> Which one  do you think is the best solution?
> >> >>
> >> >> On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske <
> fhue...@gmail.com>
> >> >> wrote:
> >> >>>
> >> >>> Hi Flavio,
> >> >>>
> >> >>> I did not completely u

Re:

2016-02-23 Thread Zach Cox
Hi Ufuk - here is the jira issue with the requested information:
https://issues.apache.org/jira/browse/FLINK-3483

-Zach


On Tue, Feb 23, 2016 at 8:59 AM Ufuk Celebi  wrote:

> Hey Zach! I'm not aware of an open issue for this.
>
> You can go ahead and open an issue for it. It will be very helpful to
> include the following:
> - exact Chrome and OS X version
> - the exectuion plan as JSON (via env.getExecutionPlan())
> - screenshot
>
> Thanks!
>
> – Ufuk
>
>
> On Tue, Feb 23, 2016 at 3:46 PM, Zach Cox  wrote:
> > Hi - I typically use the Chrome browser on OS X, and notice that with
> > 1.0.0-rc0 the job graph visualization displays the nodes in the graph,
> but
> > not any of the edges. Also the graph does not move around when dragging
> the
> > mouse.
> >
> > The job graph visualization seems to work perfectly in Safari and
> Firefox on
> > OS X, however.
> >
> > Is this a known issue or should I open a jira ticket?
> >
> > Thanks,
> > Zach
> >
>


Re:

2016-02-23 Thread Ufuk Celebi
Thanks! :-)

I hope we can fix it for the release.

On Tue, Feb 23, 2016 at 4:45 PM, Zach Cox  wrote:
> Hi Ufuk - here is the jira issue with the requested information:
> https://issues.apache.org/jira/browse/FLINK-3483
>
> -Zach
>
>
> On Tue, Feb 23, 2016 at 8:59 AM Ufuk Celebi  wrote:
>>
>> Hey Zach! I'm not aware of an open issue for this.
>>
>> You can go ahead and open an issue for it. It will be very helpful to
>> include the following:
>> - exact Chrome and OS X version
>> - the exectuion plan as JSON (via env.getExecutionPlan())
>> - screenshot
>>
>> Thanks!
>>
>> – Ufuk
>>
>>
>> On Tue, Feb 23, 2016 at 3:46 PM, Zach Cox  wrote:
>> > Hi - I typically use the Chrome browser on OS X, and notice that with
>> > 1.0.0-rc0 the job graph visualization displays the nodes in the graph,
>> > but
>> > not any of the edges. Also the graph does not move around when dragging
>> > the
>> > mouse.
>> >
>> > The job graph visualization seems to work perfectly in Safari and
>> > Firefox on
>> > OS X, however.
>> >
>> > Is this a known issue or should I open a jira ticket?
>> >
>> > Thanks,
>> > Zach
>> >


How to use all available task managers

2016-02-23 Thread Saiph Kappa
Hi,

I  am running a flink stream application on a cluster with 6 slaves/task
managers. I have set in flink-conf.yaml of every machine
"parallelization.degree.default: 6". However, when I run my application it
just uses one task slot and not all of them. Am I missing something?

Thanks.


DataStream transformation isolation in Flink Streaming

2016-02-23 Thread Juan Rodríguez Hortalá
Hi,

I was thinking on a problem and how to solve it with Flink Streaming.
Imagine you have a stream of data where you want to apply several
transformations, where some transformations depend on previous
transformations and there is a final set of actions. This is modeled in a
natural way as a DAG which could be implemented in Flink Streaming or Storm
or Spark Streaming. So far this is a typical problem, but imagine that now
I have the requirement that each of the paths in the graph must be able to
fail without affecting the other paths. For example given the following DAG

I -f1 -> A -f2-> B ->a1
  \g1-> C -g2-> D ->a2
  \h1-> E /
 \-h2-> F ->a3

here I have
- a single input DataStream I
- several derived DataStream  A, B, C, D, E
- several DataStream transformations f1, f2, g1, g2, h1, h2, each of them
of arity 1 except for g2 that defined D = g2(C, E)
- final data sinks a1, a2, a3

I don't know much about Flink, but I assume that if some of the
transformations starts failing temporarily, then the whole program will
temporarily fail until that transformation goes back to normal. For example
if h2 starts failing to compute F = h2(E), because h2 uses has some
dependency that is temporarily unavailable (a database, a service, ...),
then there is no warranty that B will keep being computed correctly and
sending records to the sink a1, event though there is no path from I to B
that prevents those records from being computed. At least that is what I
would expect to happen with Spark Streaming. My first question is that this
would in fact happen with Flink Streaming too.

Also it would be nice to be able to update the code of each DataStream
transformation independently, I guess you can't because you have a single
Flink program. Hence if you want to modify the implementation of a single
transformation, even if you are still respecting the transformation
input-output interface, you have to stop and restart the whole topology.
You could have an approximation to that topology by defining a micro
service per each DataStream transformation, and connecting the services
with queues. You could also make this more scalable by using several server
instances for each service behind a load balancer per service. Or you could
use Akka actors or something like that, which is basically equivalent to
these groups of processes communicating through queues. But then you lose
the high level programming interface and all the other benefits of Flink,
and also the system infraestructure gets way more complicated that a single
YARN cluster. I was wondering if it could be possible to split the DAG into
several sub DAGs, implement each of those sub DAGs as Flink Streaming
program, and then connecting those DAGs without having to use some
intermediate external queue like Kafka, but using the internal queues used
by Flink. In other words, it is possible to connect several Flink Streaming
programs without using an external queue? That could be an interesting
compromise that would allow to have different types of modularities (in
functions, in different physical components) and isolation levels.
This is quite of a speculative problem, but I thinks situations like this
are not uncommon in practice.
Thank for your help in any case.

Greetings,

Juan