Re: nosuchmethoderror

2015-09-03 Thread Robert Metzger
I'm sorry that we changed the method name between minor versions.

We'll soon bring some infrastructure in place a) mark the audience of
classes and b) ensure that public APIs are stable.

On Wed, Sep 2, 2015 at 9:04 PM, Ferenc Turi  wrote:

> Ok. As I see only the method name was changed. It was an unnecessary
> modification which caused the incompatibility.
>
> F.
>
> On Wed, Sep 2, 2015 at 8:53 PM, Márton Balassi 
> wrote:
>
>> Dear Ferenc,
>>
>> The Kafka consumer implementations was modified from 0.9.0 to 0.9.1,
>> please use the new code. [1]
>>
>> I suspect that your com.nventdata.kafkaflink.sink.FlinkKafkaTopicWriterSink
>> depends on the way the Flink code used to look in 0.9.0, if you take a
>> closer look Robert changed the function that is missing in your error in
>> [1].
>>
>> [1]
>> https://github.com/apache/flink/commit/940a7c8a667875b8512b63e4a32453b1a2a58785
>>
>> Best,
>>
>> Márton
>>
>> On Wed, Sep 2, 2015 at 8:47 PM, Ferenc Turi  wrote:
>>
>>> Hi,
>>>
>>> I tried to use the latest 0.9.1 release but I got:
>>>
>>> java.lang.NoSuchMethodError:
>>> org.apache.flink.util.NetUtils.ensureCorrectHostnamePort(Ljava/lang/String;)V
>>> at
>>> com.nventdata.kafkaflink.sink.FlinkKafkaTopicWriterSink.(FlinkKafkaTopicWriterSink.java:69)
>>> at
>>> com.nventdata.kafkaflink.sink.FlinkKafkaTopicWriterSink.(FlinkKafkaTopicWriterSink.java:48)
>>> at
>>> com.nventdata.kafkaflink.FlinkKafkaTopicWriterMain.main(FlinkKafkaTopicWriterMain.java:54)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
>>> at org.apache.flink.client.program.Client.run(Client.java:315)
>>> at
>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
>>> at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
>>>
>>>
>>> Thanks,
>>>
>>> Ferenc
>>>
>>
>>
>
>
> --
> Kind Regards,
>
> Ferenc
>
>
>


Re: Travis updates on Github

2015-09-03 Thread Robert Metzger
I've filed a JIRA at INFRA:
https://issues.apache.org/jira/browse/INFRA-10239

On Wed, Sep 2, 2015 at 11:18 AM, Robert Metzger  wrote:

> Hi Sachin,
>
> I also noticed that the GitHub integration is not working properly. I'll
> ask the Apache Infra team.
>
> On Wed, Sep 2, 2015 at 10:20 AM, Sachin Goel 
> wrote:
>
>> Hi all
>> Is there some issue with travis integration? The last three pull requests
>> do not have their build status on Github page. The builds are getting
>> triggered though.
>>
>> Regards
>> Sachin
>> -- Sachin Goel
>> Computer Science, IIT Delhi
>> m. +91-9871457685
>>
>
>


Re: Bug broadcasting objects (serialization issue)

2015-09-03 Thread Andres R. Masegosa
Hi,

I get a new similar bug when broadcasting a list of integers if this
list is made unmodifiable,

elements = Collections.unmodifiableList(elements);


I include this code to reproduce the result,


public class WordCountExample {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();

DataSet text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");

List elements = new ArrayList();
elements.add(0);

elements = Collections.unmodifiableList(elements);

DataSet set = env.fromElements(new TestClass(elements));

DataSet> wordCounts = text
.flatMap(new LineSplitter())
.withBroadcastSet(set, "set")
.groupBy(0)
.sum(1);

wordCounts.print();
}

public static class LineSplitter implements FlatMapFunction> {
@Override
public void flatMap(String line, Collector> out) {
for (String word : line.split(" ")) {
out.collect(new Tuple2(word, 1));
}
}
}

public static class TestClass implements Serializable {
private static final long serialVersionUID = -2932037991574118651L;

List integerList;
public TestClass(List integerList){
this.integerList=integerList;
}


}
}

Thanks for your support,
Andres

On 2/9/15 11:17, Andres R. Masegosa  wrote:
> Hi,
> 
> I get a bug when trying to broadcast a list of integers created with the
> primitive "Arrays.asList(...)".
> 
> For example, if you try to run this "wordcount" example, you can
> reproduce the bug.
> 
> 
> public class WordCountExample {
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> 
> DataSet text = env.fromElements(
> "Who's there?",
> "I think I hear them. Stand, ho! Who's there?");
> 
> List elements = Arrays.asList(0, 0, 0);
> 
> DataSet set = env.fromElements(new TestClass(elements));
> 
> DataSet> wordCounts = text
> .flatMap(new LineSplitter())
> .withBroadcastSet(set, "set")
> .groupBy(0)
> .sum(1);
> 
> wordCounts.print();
> }
> 
> public static class LineSplitter implements FlatMapFunction Tuple2> {
> @Override
> public void flatMap(String line, Collector Integer>> out) {
> for (String word : line.split(" ")) {
> out.collect(new Tuple2(word, 1));
> }
> }
> }
> 
> public static class TestClass implements Serializable {
> private static final long serialVersionUID = -2932037991574118651L;
> 
> List integerList;
> public TestClass(List integerList){
> this.integerList=integerList;
> }
> 
> 
> }
> }
> 
> 
> However, if instead of using the primitive "Arrays.asList(...)", we use
> instead the ArrayList<> constructor, there is any problem
> 
> 
> Regards,
> Andres
> 


Re: question on flink-storm-examples

2015-09-03 Thread Matthias J. Sax
One more remark that just came to my mind. There is a storm-hdfs module
available: https://github.com/apache/storm/tree/master/external/storm-hdfs

Maybe you can use it. It would be great if you could give feedback if
this works for you.

-Matthias

On 09/02/2015 10:52 AM, Matthias J. Sax wrote:
> Hi,
> StormFileSpout uses a simple FileReader internally an cannot deal with
> HDFS. It would be a nice extension to have. I just opened a JIRA for it:
> https://issues.apache.org/jira/browse/FLINK-2606
> 
> Jerry, feel to work in this feature and contribute code to Flink ;)
> 
> -Matthias
> 
> On 09/02/2015 07:52 AM, Aljoscha Krettek wrote:
>> Hi Jerry,
>> unfortunately, it seems that the StormFileSpout can only read files from
>> a local filesystem, not from HDFS. Maybe Matthias has something in the
>> works for that.
>>
>> Regards,
>> Aljoscha
>>
>> On Tue, 1 Sep 2015 at 23:33 Jerry Peng > > wrote:
>>
>> Ya that what I did and everything seems execute fine but when I try
>> to run the WordCount-StormTopology with a file on hfs I get
>> a java.io.FileNotFoundException :
>>
>> java.lang.RuntimeException: java.io.FileNotFoundException:
>> /home/jerrypeng/hadoop/hadoop_dir/data/data.txt (No such file or
>> directory)
>>
>> at
>> 
>> org.apache.flink.stormcompatibility.util.StormFileSpout.open(StormFileSpout.java:50)
>>
>> at
>> 
>> org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.run(AbstractStormSpoutWrapper.java:102)
>>
>> at
>> 
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
>>
>> at
>> 
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)
>>
>> at
>> 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172)
>>
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Caused by: java.io.FileNotFoundException:
>> /home/jerrypeng/hadoop/hadoop_dir/data/data.txt (No such file or
>> directory)
>>
>> at java.io.FileInputStream.open(Native Method)
>>
>> at java.io.FileInputStream.(FileInputStream.java:138)
>>
>> at java.io.FileInputStream.(FileInputStream.java:93)
>>
>> at java.io.FileReader.(FileReader.java:58)
>>
>> at
>> 
>> org.apache.flink.stormcompatibility.util.StormFileSpout.open(StormFileSpout.java:48)
>>
>>
>>
>> However I have that file on my hdfs namespace:
>>
>>
>> $ hadoop fs -ls -R /
>>
>> 15/09/01 21:25:11 WARN util.NativeCodeLoader: Unable to load
>> native-hadoop library for your platform... using builtin-java
>> classes where applicable
>>
>> drwxr-xr-x   - jerrypeng supergroup  0 2015-08-21 14:40 /home
>>
>> drwxr-xr-x   - jerrypeng supergroup  0 2015-08-21 14:40
>> /home/jerrypeng
>>
>> drwxr-xr-x   - jerrypeng supergroup  0 2015-08-21 14:41
>> /home/jerrypeng/hadoop
>>
>> drwxr-xr-x   - jerrypeng supergroup  0 2015-08-21 14:40
>> /home/jerrypeng/hadoop/dir
>>
>> drwxr-xr-x   - jerrypeng supergroup  0 2015-08-24 16:06
>> /home/jerrypeng/hadoop/hadoop_dir
>>
>> drwxr-xr-x   - jerrypeng supergroup  0 2015-09-01 20:48
>> /home/jerrypeng/hadoop/hadoop_dir/data
>>
>> -rw-r--r--   3 jerrypeng supergroup  18552 2015-09-01 19:18
>> /home/jerrypeng/hadoop/hadoop_dir/data/data.txt
>>
>> -rw-r--r--   3 jerrypeng supergroup  0 2015-09-01 20:48
>> /home/jerrypeng/hadoop/hadoop_dir/data/result.txt
>>
>> drwxr-xr-x   - jerrypeng supergroup  0 2015-08-21 14:41
>> /home/jerrypeng/hadoop/hadoop_dir/dir1
>>
>> drwxr-xr-x   - jerrypeng supergroup  0 2015-08-24 15:59
>> /home/jerrypeng/hadoop/hadoop_dir/test
>>
>> -rw-r--r--   3 jerrypeng supergroup 32 2015-08-24 15:59
>> /home/jerrypeng/hadoop/hadoop_dir/test/filename.txt
>>
>>
>> Any idea what's going on?
>>
>>
>> On Tue, Sep 1, 2015 at 4:20 PM, Matthias J. Sax
>> > > wrote:
>>
>> You can use "bin/flink cancel JOBID" or JobManager WebUI to
>> cancel the
>> running job.
>>
>> The exception you see, occurs in
>> FlinkSubmitter.killTopology(...) which
>> is not used by "bin/flink cancel" or JobMaanger WebUI.
>>
>> If you compile the example you yourself, just remove the call to
>> killTopology().
>>
>> -Matthias
>>
>> On 09/01/2015 11:16 PM, Matthias J. Sax wrote:
>> > Oh yes. I forgot about this. I have already a fix for it in a
>> pending
>> > pull request... I hope that this PR is merged soon...
>> >
>> > If you want to observe the progress, look here:
>> > https://issues.apache.org/jira/browse/FLINK-2111
>> > and
>> > https://issues.apache.org/jira/browse/FLINK-23

Flink batch runs OK but Yarn container fails in batch mode with -m yarn-cluster

2015-09-03 Thread LINZ, Arnaud
Hi,



I am wondering why, despite the fact that my java main() methods runs OK and 
exit with 0 code value, the Yarn container status set by the englobing flink 
execution is FAILED with diagnostic "Flink YARN Client requested shutdown."?



Command line :

flink run -m yarn-cluster -yn 20 -ytm 8192 -yqu batch1 -ys 8 --class  
 



End of yarn log :



Status of job 6ac47ddc8331ffd0b1fa9a3b5a551f86 (KUBERA-GEO-BRUT2SEGMENT) 
changed to FINISHED.

10:03:00,618 INFO  org.apache.flink.yarn.ApplicationMaster$$anonfun$2$$anon$1   
 - Stopping YARN JobManager with status FAILED and diagnostic Flink YARN Client 
requested shutdown.

10:03:00,625 INFO  org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl
 - Waiting for application to be successfully unregistered.

10:03:00,874 INFO  
org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - 
Closing proxy : h1r2dn12.bpa.bouyguestelecom.fr:45454

(… more closing proxy …)

10:03:00,877 INFO  
org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - 
Closing proxy : h1r2dn01.bpa.bouyguestelecom.fr:45454

10:03:00,883 INFO  org.apache.flink.yarn.ApplicationMaster$$anonfun$2$$anon$1   
 - Stopping JobManager akka://flink/user/jobmanager#1737010364.

10:03:00,895 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
 - Shutting down remote daemon.

10:03:00,896 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
 - Remote daemon shut down; proceeding with flushing remote transports.

10:03:00,918 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
 - Remoting shut down.



End of log4j log:



2015:09:03 10:03:00 (main) - INFO - 
com.bouygtel.kuberasdk.main.Application.mainMethod - Fin ok traitement

2015:09:03 10:03:00 (Thread-14) - INFO - Classe Inconnue.Methode Inconnue - 
Shutting down FlinkYarnCluster from the client shutdown hook

2015:09:03 10:03:00 (Thread-14) - INFO - Classe Inconnue.Methode Inconnue - 
Sending shutdown request to the Application Master

2015:09:03 10:03:00 (flink-akka.actor.default-dispatcher-2) - INFO - Classe 
Inconnue.Methode Inconnue - Sending StopYarnSession request to 
ApplicationMaster.

2015:09:03 10:03:00 (flink-akka.actor.default-dispatcher-2) - INFO - Classe 
Inconnue.Methode Inconnue - Remote JobManager has been stopped successfully. 
Stopping local application client

2015:09:03 10:03:00 (flink-akka.actor.default-dispatcher-2) - INFO - Classe 
Inconnue.Methode Inconnue - Stopped Application client.

2015:09:03 10:03:00 (flink-akka.actor.default-dispatcher-15) - INFO - Classe 
Inconnue.Methode Inconnue - Shutting down remote daemon.

2015:09:03 10:03:00 (flink-akka.actor.default-dispatcher-15) - INFO - Classe 
Inconnue.Methode Inconnue - Remote daemon shut down; proceeding with flushing 
remote transports.

2015:09:03 10:03:00 (flink-akka.actor.default-dispatcher-15) - INFO - Classe 
Inconnue.Methode Inconnue - Remoting shut down.

2015:09:03 10:03:00 (Thread-14) - INFO - Classe Inconnue.Methode Inconnue - 
Deleting files in 
hdfs://h1r1nn01.bpa.bouyguestelecom.fr:8020/user/datcrypt/.flink/application_1441011714087_0730

2015:09:03 10:03:00 (Thread-15) - INFO - Classe Inconnue.Methode Inconnue - 
Application application_1441011714087_0730 finished with state FINISHED and 
final state FAILED at 1441267380623

2015:09:03 10:03:00 (Thread-14) - WARN - Classe Inconnue.Methode Inconnue - The 
short-circuit local reads feature cannot be used because libhadoop cannot be 
loaded.

2015:09:03 10:03:01 (Thread-14) - INFO - Classe Inconnue.Methode Inconnue - 
YARN Client is shutting down



Greetings,

Arnaud



L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.


Re: when use broadcast variable and run on bigdata display this error please help

2015-09-03 Thread Chiwan Park
Hi hagersaleh,

Sorry for late reply.

I think using an external system could be a solution for large scale data. To 
use an external system, you have to implement rich functions such as 
RichFilterFunction, RichMapFunction, …, etc.


Regards,
Chiwan Park


> On Aug 30, 2015, at 1:30 AM, hagersaleh  wrote:
> 
> where are any ways for use broadcast variable with bigdata
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/when-use-broadcast-variable-and-run-on-bigdata-display-this-error-please-help-tp2455p2566.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.






Re: Flink batch runs OK but Yarn container fails in batch mode with -m yarn-cluster

2015-09-03 Thread Robert Metzger
Hi Arnaud,

I think that's a bug ;)
I'll file a JIRA to fix it for the next release.

On Thu, Sep 3, 2015 at 10:26 AM, LINZ, Arnaud 
wrote:

> Hi,
>
>
>
> I am wondering why, despite the fact that my java main() methods runs OK
> and exit with 0 code value, the Yarn container status set by the englobing
> flink execution is FAILED with diagnostic "Flink YARN Client requested
> shutdown."?
>
>
>
> Command line :
>
> flink run -m yarn-cluster -yn 20 -ytm 8192 -yqu batch1 -ys 8 --class
>   
>
>
>
> End of yarn log :
>
>
>
> Status of job 6ac47ddc8331ffd0b1fa9a3b5a551f86 (KUBERA-GEO-BRUT2SEGMENT)
> changed to FINISHED.
>
> 10:03:00,618 INFO
> org.apache.flink.yarn.ApplicationMaster$$anonfun$2$$anon$1- Stopping
> YARN JobManager with status FAILED and diagnostic Flink YARN Client
> requested shutdown.
>
> 10:03:00,625 INFO
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl - Waiting for
> application to be successfully unregistered.
>
> 10:03:00,874 INFO
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  -
> Closing proxy : h1r2dn12.bpa.bouyguestelecom.fr:45454
>
> (… more closing proxy …)
>
> 10:03:00,877 INFO
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  -
> Closing proxy : h1r2dn01.bpa.bouyguestelecom.fr:45454
>
> 10:03:00,883 INFO
> org.apache.flink.yarn.ApplicationMaster$$anonfun$2$$anon$1- Stopping
> JobManager akka://flink/user/jobmanager#1737010364.
>
> 10:03:00,895 INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting
> down remote daemon.
>
> 10:03:00,896 INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote
> daemon shut down; proceeding with flushing remote transports.
>
> 10:03:00,918 INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting
> shut down.
>
>
>
> End of log4j log:
>
>
>
> 2015:09:03 10:03:00 (main) - INFO -
> com.bouygtel.kuberasdk.main.Application.mainMethod - Fin ok traitement
>
> 2015:09:03 10:03:00 (Thread-14) - INFO - Classe Inconnue.Methode Inconnue
> - Shutting down FlinkYarnCluster from the client shutdown hook
>
> 2015:09:03 10:03:00 (Thread-14) - INFO - Classe Inconnue.Methode Inconnue
> - Sending shutdown request to the Application Master
>
> 2015:09:03 10:03:00 (flink-akka.actor.default-dispatcher-2) - INFO -
> Classe Inconnue.Methode Inconnue - Sending StopYarnSession request to
> ApplicationMaster.
>
> 2015:09:03 10:03:00 (flink-akka.actor.default-dispatcher-2) - INFO -
> Classe Inconnue.Methode Inconnue - Remote JobManager has been stopped
> successfully. Stopping local application client
>
> 2015:09:03 10:03:00 (flink-akka.actor.default-dispatcher-2) - INFO -
> Classe Inconnue.Methode Inconnue - Stopped Application client.
>
> 2015:09:03 10:03:00 (flink-akka.actor.default-dispatcher-15) - INFO -
> Classe Inconnue.Methode Inconnue - Shutting down remote daemon.
>
> 2015:09:03 10:03:00 (flink-akka.actor.default-dispatcher-15) - INFO -
> Classe Inconnue.Methode Inconnue - Remote daemon shut down; proceeding with
> flushing remote transports.
>
> 2015:09:03 10:03:00 (flink-akka.actor.default-dispatcher-15) - INFO -
> Classe Inconnue.Methode Inconnue - Remoting shut down.
>
> 2015:09:03 10:03:00 (Thread-14) - INFO - Classe Inconnue.Methode Inconnue
> - Deleting files in hdfs://
> h1r1nn01.bpa.bouyguestelecom.fr:8020/user/datcrypt/.flink/application_1441011714087_0730
>
> 2015:09:03 10:03:00 (Thread-15) - INFO - Classe Inconnue.Methode Inconnue
> - Application application_1441011714087_0730 finished with state FINISHED
> and final state FAILED at 1441267380623
>
> 2015:09:03 10:03:00 (Thread-14) - WARN - Classe Inconnue.Methode Inconnue
> - The short-circuit local reads feature cannot be used because libhadoop
> cannot be loaded.
>
> 2015:09:03 10:03:01 (Thread-14) - INFO - Classe Inconnue.Methode Inconnue
> - YARN Client is shutting down
>
>
>
> Greetings,
>
> Arnaud
>
> --
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>


Re: Bug broadcasting objects (serialization issue)

2015-09-03 Thread Maximilian Michels
Thanks for clarifying the "eager serialization". By serializing and
deserializing explicitly (eagerly) we can raise better Exceptions to
notify the user of non-serializable classes.

> BTW: There is an opportunity to fix two problems with one patch: The 
> framesize overflow for the input format, and the serialization.

IMHO this adds another layer of complexity to the job submission
phase. I just had a chat with Robert about this. I wonder, is it
possible to increase the Akka framesize only for the Client
ActorSystem?

On Wed, Sep 2, 2015 at 4:27 PM, Stephan Ewen  wrote:
> I see.
>
> Manual serialization implies also manual deserialization (on the workers
> only), which would give a better exception.
>
> BTW: There is an opportunity to fix two problems with one patch: The
> framesize overflow for the input format, and the serialization.
>
> On Wed, Sep 2, 2015 at 4:16 PM, Maximilian Michels  wrote:
>>
>> Ok but that would not prevent the above error, right? Serializing is
>> not the issue here.
>>
>> Nevertheless, it would catch all errors during initial serialization.
>> Deserializing has its own hazards due to possible Classloader issues.
>>
>> On Wed, Sep 2, 2015 at 4:05 PM, Stephan Ewen  wrote:
>> > Yes, even serialize in the constructor. Then the failure (if
>> > serialization
>> > does not work) comes immediately.
>> >
>> > On Wed, Sep 2, 2015 at 4:02 PM, Maximilian Michels 
>> > wrote:
>> >>
>> >> Nice suggestion. So you want to serialize and deserialize the
>> >> InputFormats
>> >> on the Client to check whether they can be transferred correctly?
>> >> Merely
>> >> serializing is not enough because the above Exception occurs during
>> >> deserialization.
>> >>
>> >> On Wed, Sep 2, 2015 at 2:29 PM, Stephan Ewen  wrote:
>> >>>
>> >>> We should try to improve the exception here. More people will run into
>> >>> this issue and the exception should help them understand it well.
>> >>>
>> >>> How about we do eager serialization into a set of byte arrays? Then
>> >>> the
>> >>> serializability issue comes immediately when the program is
>> >>> constructed,
>> >>> rather than later, when it is shipped.
>> >>>
>> >>> On Wed, Sep 2, 2015 at 12:56 PM, Maximilian Michels 
>> >>> wrote:
>> 
>>  Here's the JIRA issue:
>>  https://issues.apache.org/jira/browse/FLINK-2608
>> 
>>  On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels 
>>  wrote:
>> >
>> > Hi Andreas,
>> >
>> > Thank you for reporting the problem and including the code to
>> > reproduce
>> > the problem. I think there is a problem with the class serialization
>> > or
>> > deserialization. Arrays.asList uses a private ArrayList class
>> > (java.util.Arrays$ArrayList) which is not the one you would normally
>> > use
>> > (java.util.ArrayList).
>> >
>> > I'll create a JIRA issue to keep track of the problem and to
>> > investigate further.
>> >
>> > Best regards,
>> > Max
>> >
>> > Here's the stack trace:
>> >
>> > Exception in thread "main"
>> > org.apache.flink.runtime.client.JobExecutionException: Cannot
>> > initialize
>> > task 'DataSource (at main(Test.java:32)
>> > (org.apache.flink.api.java.io.CollectionInputFormat))':
>> > Deserializing the
>> > InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block
>> > data
>> > at
>> >
>> > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
>> > at
>> >
>> > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
>> > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> > at
>> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> > at
>> > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> > at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> > at
>> >
>> > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
>> > at
>> >
>> > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>> > at
>> >
>> > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>> > at
>> >
>> > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>> > at
>> >
>> > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>> > at
>> >
>> > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
>> > at
>> >
>> > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>> > at
>>

Re: max-fan

2015-09-03 Thread Stephan Ewen
Hi Greg!

That number should control the merge fan in, yes. Maybe a bug was
introduced a while back that prevents this parameter from being properly
passed through the system. Have you modified the config value in the
cluster, on the client, or are you starting the job via the command line,
in which case both are the same? In any case, we'll fix that soon,
definitely. Could you open an issue for that?


Concerning the sub-optimal merging: You are right, this could be improved,
like you said. Right mow, the attempt is to create uniform files, but your
suggestion would be more efficient.
The part is here in the code
https://github.com/StephanEwen/incubator-flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java#L1400

Is this a critical issue for you? Would you be up for making a patch for
this? It should be a fairly isolated change.


Greetings,
Stephan


On Thu, Sep 3, 2015 at 3:02 AM, Greg Hogan  wrote:

> When workers spill more than 128 files, I have seen these fully merged
> into one or more much larger files. Does the following parameter allow more
> files to be stored without requiring the intermediate merge-sort? I have
> changed it to 1024 without effect. Also, it appears that the entire set of
> small files is reprocessed rather than the minimum required to attain the
> max fan-in (i.e., starting with 150 files, 23 would be merged leaving 128
> to be processed concurrently).
>
> taskmanager.runtime.max-fan: The maximal fan-in for external merge joins
> and fan-out for spilling hash tables. Limits the number of file handles per
> operator, but may cause intermediate merging/partitioning, if set too small
> (DEFAULT: 128).
>
> Greg Hogan
>


Splitting Streams

2015-09-03 Thread Martin Neumann
Hej,

I have a Stream of json objects of several different types. I want to split
this stream into several streams each of them dealing with one type. (so
its not partitioning)

The only Way I found so far is writing a bunch of filters and connect them
to the source directly. This way I will have a lot of duplicated logic.
I'm basically looking for a single operator that can have multiple output
streams of different types.

cheers Martin


Re: Splitting Streams

2015-09-03 Thread Till Rohrmann
Hi Martin,

could grouping be a solution to your problem?

Cheers,
Till

On Thu, Sep 3, 2015 at 11:56 AM, Martin Neumann  wrote:

> Hej,
>
> I have a Stream of json objects of several different types. I want to
> split this stream into several streams each of them dealing with one type.
> (so its not partitioning)
>
> The only Way I found so far is writing a bunch of filters and connect them
> to the source directly. This way I will have a lot of duplicated logic.
> I'm basically looking for a single operator that can have multiple output
> streams of different types.
>
> cheers Martin
>


Re: Splitting Streams

2015-09-03 Thread Aljoscha Krettek
Hi Martin,
maybe this is what you are looking for:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#output-splitting

Regards,
Aljoscha

On Thu, 3 Sep 2015 at 12:02 Till Rohrmann  wrote:

> Hi Martin,
>
> could grouping be a solution to your problem?
>
> Cheers,
> Till
>
> On Thu, Sep 3, 2015 at 11:56 AM, Martin Neumann  wrote:
>
>> Hej,
>>
>> I have a Stream of json objects of several different types. I want to
>> split this stream into several streams each of them dealing with one type.
>> (so its not partitioning)
>>
>> The only Way I found so far is writing a bunch of filters and connect
>> them to the source directly. This way I will have a lot of duplicated logic.
>> I'm basically looking for a single operator that can have multiple output
>> streams of different types.
>>
>> cheers Martin
>>
>
>


Re: Duplicates in Flink

2015-09-03 Thread Rico Bergmann
Hi!

Testing it with the current 0.10 snapshot is not easily possible atm

But I deactivated checkpointing in my program and still get duplicates in my 
output. So it seems not only to come from the checkpointing feature, or?

May be the KafkaSink is responsible for this? (Just my guess)

Cheers Rico. 



> Am 01.09.2015 um 15:37 schrieb Aljoscha Krettek :
> 
> Hi Rico,
> unfortunately the 0.9 branch still seems to have problems with exactly once 
> processing and checkpointed operators. We reworked how the checkpoints are 
> handled for the 0.10 release so it should work well there. 
> 
> Could you maybe try running on the 0.10-SNAPSHOT release and see if the 
> problems persist there?
> 
> Cheers,
> Aljoscha
> 
>> On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann  
>> wrote:
>> Hi!
>> 
>> I still have an issue... I was now using 0.9.1 and the new
>> KafkaConnector. But I still get duplicates in my flink prog. Here's the
>> relevant part:
>> 
>>  final FlinkKafkaConsumer082 kafkaSrc = new
>> FlinkKafkaConsumer082(
>>  kafkaTopicIn, new SimpleStringSchema(), properties);
>> 
>>  DataStream start = env.addSource(kafkaSrc)
>>  .setParallelism(numReadPartitions); //numReadPartitions = 2
>> 
>>  DataStream jsonized = start
>>  .flatMap(new ExtractAndFilterJSON());
>> 
>>  DataStream sessions = jsonized
>>  .partitionByHash(new KeySelector() {
>>  /**
>>   * partition by session id
>>   */
>>  @Override
>>  public String getKey(JSONObject value)
>>  throws Exception {
>>  try {
>>  return /*session id*/;
>>  } catch (Exception e) {
>>  LOG.error("no session could be retrieved", e);
>>  }
>>  return "";
>>  }
>>  }).flatMap(new StatefulSearchSessionizer());
>> 
>> In the StatefulSearchSessionizer I receive duplicates sporadically. I'm
>> sure that the kafka topic I'm reading from does not contain any
>> duplicates. So it must be in the flink program ...
>> 
>> Any ideas?
>> 
>> Cheers, Rico.


Re: Splitting Streams

2015-09-03 Thread Martin Neumann
@Till:
The fields are hidden inside the json string so that I have to deserialize
first. Also the classes do not have so much in common. It might be possible
to do it with a hierarchy of group by. I'm not sure how complicated that
would be, additionally I will have to send the raw String around for longer
and it might be harder to extend it later.

@Aljoscha:
The problem with the split is that the channels that is split into have to
have the same type. In my case the input would be string and the output
would be different types of object streams.

I think for now I will try to split the string stream into several
sub-streams using the split operator and then do the type conversion
afterwards. I will still have to distribute the logic over several
operators this way but its less ugly as my current solution.

cheers Martin


On Thu, Sep 3, 2015 at 12:20 PM, Aljoscha Krettek 
wrote:

> Hi Martin,
> maybe this is what you are looking for:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#output-splitting
>
> Regards,
> Aljoscha
>
> On Thu, 3 Sep 2015 at 12:02 Till Rohrmann  wrote:
>
>> Hi Martin,
>>
>> could grouping be a solution to your problem?
>>
>> Cheers,
>> Till
>>
>> On Thu, Sep 3, 2015 at 11:56 AM, Martin Neumann  wrote:
>>
>>> Hej,
>>>
>>> I have a Stream of json objects of several different types. I want to
>>> split this stream into several streams each of them dealing with one type.
>>> (so its not partitioning)
>>>
>>> The only Way I found so far is writing a bunch of filters and connect
>>> them to the source directly. This way I will have a lot of duplicated logic.
>>> I'm basically looking for a single operator that can have multiple
>>> output streams of different types.
>>>
>>> cheers Martin
>>>
>>
>>


Re: Duplicates in Flink

2015-09-03 Thread Stephan Ewen
Do you mean the KafkaSource?

Which KafkaSource are you using? The 0.9.1 FlinkKafkaConsumer082 or the
KafkaSource?

On Thu, Sep 3, 2015 at 1:10 PM, Rico Bergmann  wrote:

> Hi!
>
> Testing it with the current 0.10 snapshot is not easily possible atm
>
> But I deactivated checkpointing in my program and still get duplicates in
> my output. So it seems not only to come from the checkpointing feature, or?
>
> May be the KafkaSink is responsible for this? (Just my guess)
>
> Cheers Rico.
>
>
>
> Am 01.09.2015 um 15:37 schrieb Aljoscha Krettek :
>
> Hi Rico,
> unfortunately the 0.9 branch still seems to have problems with exactly
> once processing and checkpointed operators. We reworked how the checkpoints
> are handled for the 0.10 release so it should work well there.
>
> Could you maybe try running on the 0.10-SNAPSHOT release and see if the
> problems persist there?
>
> Cheers,
> Aljoscha
>
> On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann 
> wrote:
>
>> Hi!
>>
>> I still have an issue... I was now using 0.9.1 and the new
>> KafkaConnector. But I still get duplicates in my flink prog. Here's the
>> relevant part:
>>
>>  final FlinkKafkaConsumer082 kafkaSrc = new
>> FlinkKafkaConsumer082(
>>  kafkaTopicIn, new SimpleStringSchema(), properties);
>>
>>  DataStream start = env.addSource(kafkaSrc)
>>  .setParallelism(numReadPartitions); //numReadPartitions = 2
>>
>>  DataStream jsonized = start
>>  .flatMap(new ExtractAndFilterJSON());
>>
>>  DataStream sessions = jsonized
>>  .partitionByHash(new KeySelector() {
>>  /**
>>   * partition by session id
>>   */
>>  @Override
>>  public String getKey(JSONObject value)
>>  throws Exception {
>>  try {
>>  return /*session id*/;
>>  } catch (Exception e) {
>>  LOG.error("no session could be retrieved", e);
>>  }
>>  return "";
>>  }
>>  }).flatMap(new StatefulSearchSessionizer());
>>
>> In the StatefulSearchSessionizer I receive duplicates sporadically. I'm
>> sure that the kafka topic I'm reading from does not contain any
>> duplicates. So it must be in the flink program ...
>>
>> Any ideas?
>>
>> Cheers, Rico.
>>
>>


Re: Duplicates in Flink

2015-09-03 Thread Rico Bergmann
No. I mean the KafkaSink. 

A bit more insight to my program: I read from a Kafka topic with 
flinkKafkaConsumer082, then hashpartition the data, then I do a deduplication 
(does not eliminate all duplicates though). Then some computation, afterwards 
again deduplication (group by message in a window of last 2 seconds). 

Of course the last deduplication is not perfect.

Cheers. Rico. 



> Am 03.09.2015 um 13:29 schrieb Stephan Ewen :
> 
> Do you mean the KafkaSource?
> 
> Which KafkaSource are you using? The 0.9.1 FlinkKafkaConsumer082 or the 
> KafkaSource?
> 
>> On Thu, Sep 3, 2015 at 1:10 PM, Rico Bergmann  wrote:
>> Hi!
>> 
>> Testing it with the current 0.10 snapshot is not easily possible atm
>> 
>> But I deactivated checkpointing in my program and still get duplicates in my 
>> output. So it seems not only to come from the checkpointing feature, or?
>> 
>> May be the KafkaSink is responsible for this? (Just my guess)
>> 
>> Cheers Rico. 
>> 
>> 
>> 
>>> Am 01.09.2015 um 15:37 schrieb Aljoscha Krettek :
>>> 
>>> Hi Rico,
>>> unfortunately the 0.9 branch still seems to have problems with exactly once 
>>> processing and checkpointed operators. We reworked how the checkpoints are 
>>> handled for the 0.10 release so it should work well there. 
>>> 
>>> Could you maybe try running on the 0.10-SNAPSHOT release and see if the 
>>> problems persist there?
>>> 
>>> Cheers,
>>> Aljoscha
>>> 
 On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann 
  wrote:
 Hi!
 
 I still have an issue... I was now using 0.9.1 and the new
 KafkaConnector. But I still get duplicates in my flink prog. Here's the
 relevant part:
 
  final FlinkKafkaConsumer082 kafkaSrc = new
 FlinkKafkaConsumer082(
  kafkaTopicIn, new SimpleStringSchema(), properties);
 
  DataStream start = env.addSource(kafkaSrc)
  .setParallelism(numReadPartitions); //numReadPartitions = 2
 
  DataStream jsonized = start
  .flatMap(new ExtractAndFilterJSON());
 
  DataStream sessions = jsonized
  .partitionByHash(new KeySelector() {
  /**
   * partition by session id
   */
  @Override
  public String getKey(JSONObject value)
  throws Exception {
  try {
  return /*session id*/;
  } catch (Exception e) {
  LOG.error("no session could be retrieved", e);
  }
  return "";
  }
  }).flatMap(new StatefulSearchSessionizer());
 
 In the StatefulSearchSessionizer I receive duplicates sporadically. I'm
 sure that the kafka topic I'm reading from does not contain any
 duplicates. So it must be in the flink program ...
 
 Any ideas?
 
 Cheers, Rico.
> 


Re: Duplicates in Flink

2015-09-03 Thread Stephan Ewen
Can you tell us where the KafkaSink comes into play? At what point do the
duplicates come up?

On Thu, Sep 3, 2015 at 2:09 PM, Rico Bergmann  wrote:

> No. I mean the KafkaSink.
>
> A bit more insight to my program: I read from a Kafka topic with
> flinkKafkaConsumer082, then hashpartition the data, then I do a
> deduplication (does not eliminate all duplicates though). Then some
> computation, afterwards again deduplication (group by message in a window
> of last 2 seconds).
>
> Of course the last deduplication is not perfect.
>
> Cheers. Rico.
>
>
>
> Am 03.09.2015 um 13:29 schrieb Stephan Ewen :
>
> Do you mean the KafkaSource?
>
> Which KafkaSource are you using? The 0.9.1 FlinkKafkaConsumer082 or the
> KafkaSource?
>
> On Thu, Sep 3, 2015 at 1:10 PM, Rico Bergmann 
> wrote:
>
>> Hi!
>>
>> Testing it with the current 0.10 snapshot is not easily possible atm
>>
>> But I deactivated checkpointing in my program and still get duplicates in
>> my output. So it seems not only to come from the checkpointing feature, or?
>>
>> May be the KafkaSink is responsible for this? (Just my guess)
>>
>> Cheers Rico.
>>
>>
>>
>> Am 01.09.2015 um 15:37 schrieb Aljoscha Krettek :
>>
>> Hi Rico,
>> unfortunately the 0.9 branch still seems to have problems with exactly
>> once processing and checkpointed operators. We reworked how the checkpoints
>> are handled for the 0.10 release so it should work well there.
>>
>> Could you maybe try running on the 0.10-SNAPSHOT release and see if the
>> problems persist there?
>>
>> Cheers,
>> Aljoscha
>>
>> On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann <
>> i...@ricobergmann.de> wrote:
>>
>>> Hi!
>>>
>>> I still have an issue... I was now using 0.9.1 and the new
>>> KafkaConnector. But I still get duplicates in my flink prog. Here's the
>>> relevant part:
>>>
>>>  final FlinkKafkaConsumer082 kafkaSrc = new
>>> FlinkKafkaConsumer082(
>>>  kafkaTopicIn, new SimpleStringSchema(), properties);
>>>
>>>  DataStream start = env.addSource(kafkaSrc)
>>>  .setParallelism(numReadPartitions); //numReadPartitions = 2
>>>
>>>  DataStream jsonized = start
>>>  .flatMap(new ExtractAndFilterJSON());
>>>
>>>  DataStream sessions = jsonized
>>>  .partitionByHash(new KeySelector() {
>>>  /**
>>>   * partition by session id
>>>   */
>>>  @Override
>>>  public String getKey(JSONObject value)
>>>  throws Exception {
>>>  try {
>>>  return /*session id*/;
>>>  } catch (Exception e) {
>>>  LOG.error("no session could be retrieved", e);
>>>  }
>>>  return "";
>>>  }
>>>  }).flatMap(new StatefulSearchSessionizer());
>>>
>>> In the StatefulSearchSessionizer I receive duplicates sporadically. I'm
>>> sure that the kafka topic I'm reading from does not contain any
>>> duplicates. So it must be in the flink program ...
>>>
>>> Any ideas?
>>>
>>> Cheers, Rico.
>>>
>>>
>


Re: Hardware requirements and learning resources

2015-09-03 Thread Stefan Winterstein

> Answering to myself, I have found some nice training material at
> http://dataartisans.github.io/flink-training. 

Excellent resources! Somehow, I managed not to stumble over them by
myself - either I was blind, or they are well hidden... :)


Best,
-Stefan



Re: Efficiency for Filter then Transform ( filter().map() vs flatMap() )

2015-09-03 Thread Stephan Ewen
In a set of benchmarks a while back, we found that the chaining mechanism
has some overhead right now, because of its abstraction. The abstraction
creates iterators for each element and makes it hard for the JIT to
specialize on the operators in the chain.

For purely local chains at full speed, this overhead is observable (can
decrease throughput from 25mio elements/core to 15-20mio elements per
core). If your job does not reach that throughput, or is I/O bound, source
bound, etc, it does not matter.

If you care about super high performance, collapsing the code into one
function helps.

On Thu, Sep 3, 2015 at 5:59 AM, Welly Tambunan  wrote:

> Hi Gyula,
>
> Thanks for your response. Seems i will use filter and map for now as that
> one is really make the intention clear, and not a big performance hit.
>
> Thanks again.
>
> Cheers
>
> On Thu, Sep 3, 2015 at 10:29 AM, Gyula Fóra  wrote:
>
>> Hey Welly,
>>
>> If you call filter and map one after the other like you mentioned, these
>> operators will be chained and executed as if they were running in the same
>> operator.
>> The only small performance overhead comes from the fact that the output
>> of the filter will be copied before passing it as input to the map to keep
>> immutability guarantees (but no serialization/deserialization will happen).
>> Copying might be practically free depending on your data type, though.
>>
>> If you are using operators that don't make use of the immutability of
>> inputs/outputs (i.e you don't hold references to those values) than you can
>> disable copying altogether by calling env.getConfig().enableObjectReuse(),
>> in which case they will have exactly the same performance.
>>
>> Cheers,
>> Gyula
>>
>> Welly Tambunan  ezt írta (időpont: 2015. szept. 3.,
>> Cs, 4:33):
>>
>>> Hi All,
>>>
>>> I would like to filter some item from the event stream. I think there
>>> are two ways doing this.
>>>
>>> Using the regular pipeline filter(...).map(...). We can also use flatMap
>>> for doing both in the same operator.
>>>
>>> Any performance improvement if we are using flatMap ? As that will be
>>> done in one operator instance.
>>>
>>>
>>> Cheers
>>>
>>>
>>> --
>>> Welly Tambunan
>>> Triplelands
>>>
>>> http://weltam.wordpress.com
>>> http://www.triplelands.com 
>>>
>>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com 
>


Re: Duplicates in Flink

2015-09-03 Thread Rico Bergmann
The KafkaSink is the last step in my program after the 2nd deduplication. 

I could not yet track down where duplicates show up. That's a bit difficult to 
find out... But I'm trying to find it...



> Am 03.09.2015 um 14:14 schrieb Stephan Ewen :
> 
> Can you tell us where the KafkaSink comes into play? At what point do the 
> duplicates come up?
> 
>> On Thu, Sep 3, 2015 at 2:09 PM, Rico Bergmann  wrote:
>> No. I mean the KafkaSink. 
>> 
>> A bit more insight to my program: I read from a Kafka topic with 
>> flinkKafkaConsumer082, then hashpartition the data, then I do a 
>> deduplication (does not eliminate all duplicates though). Then some 
>> computation, afterwards again deduplication (group by message in a window of 
>> last 2 seconds). 
>> 
>> Of course the last deduplication is not perfect.
>> 
>> Cheers. Rico. 
>> 
>> 
>> 
>>> Am 03.09.2015 um 13:29 schrieb Stephan Ewen :
>>> 
>>> Do you mean the KafkaSource?
>>> 
>>> Which KafkaSource are you using? The 0.9.1 FlinkKafkaConsumer082 or the 
>>> KafkaSource?
>>> 
 On Thu, Sep 3, 2015 at 1:10 PM, Rico Bergmann  wrote:
 Hi!
 
 Testing it with the current 0.10 snapshot is not easily possible atm
 
 But I deactivated checkpointing in my program and still get duplicates in 
 my output. So it seems not only to come from the checkpointing feature, or?
 
 May be the KafkaSink is responsible for this? (Just my guess)
 
 Cheers Rico. 
 
 
 
> Am 01.09.2015 um 15:37 schrieb Aljoscha Krettek :
> 
> Hi Rico,
> unfortunately the 0.9 branch still seems to have problems with exactly 
> once processing and checkpointed operators. We reworked how the 
> checkpoints are handled for the 0.10 release so it should work well 
> there. 
> 
> Could you maybe try running on the 0.10-SNAPSHOT release and see if the 
> problems persist there?
> 
> Cheers,
> Aljoscha
> 
>> On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann 
>>  wrote:
>> Hi!
>> 
>> I still have an issue... I was now using 0.9.1 and the new
>> KafkaConnector. But I still get duplicates in my flink prog. Here's the
>> relevant part:
>> 
>>  final FlinkKafkaConsumer082 kafkaSrc = new
>> FlinkKafkaConsumer082(
>>  kafkaTopicIn, new SimpleStringSchema(), properties);
>> 
>>  DataStream start = env.addSource(kafkaSrc)
>>  .setParallelism(numReadPartitions); //numReadPartitions = 2
>> 
>>  DataStream jsonized = start
>>  .flatMap(new ExtractAndFilterJSON());
>> 
>>  DataStream sessions = jsonized
>>  .partitionByHash(new KeySelector() {
>>  /**
>>   * partition by session id
>>   */
>>  @Override
>>  public String getKey(JSONObject value)
>>  throws Exception {
>>  try {
>>  return /*session id*/;
>>  } catch (Exception e) {
>>  LOG.error("no session could be retrieved", e);
>>  }
>>  return "";
>>  }
>>  }).flatMap(new StatefulSearchSessionizer());
>> 
>> In the StatefulSearchSessionizer I receive duplicates sporadically. I'm
>> sure that the kafka topic I'm reading from does not contain any
>> duplicates. So it must be in the flink program ...
>> 
>> Any ideas?
>> 
>> Cheers, Rico.
> 


Re: How to force the parallelism on small streams?

2015-09-03 Thread Aljoscha Krettek
Hi,
I don't think it's a bug. If there are 100 sources that each emit only 14
elements then only the first 14 mappers will ever receive data. The
round-robin distribution is not global, since the sources operate
independently from each other.

Cheers,
Aljoscha

On Wed, 2 Sep 2015 at 20:00 Matthias J. Sax  wrote:

> Thanks for clarifying. shuffle() is similar to rebalance() -- however,
> it redistributes randomly and not in round robin fashion.
>
> However, the problem you describe sounds like a bug to me. I included
> dev list. Maybe anyone else can step in so we can identify it there is a
> bug or not.
>
> -Matthias
>
>
> On 09/02/2015 06:19 PM, LINZ, Arnaud wrote:
> > Hi,
> >
> > You are right, but in fact it does not solve my problem, since I have
> 100 parallelism everywhere. Each of my 100 sources gives only a few lines
> (say 14 max), and only the first 14 next nodes will receive data.
> > Same problem by replacing rebalance() with shuffle().
> >
> > But I found a workaround: setting parallelism to 1 for the source (I
> don't need a 100 directory scanners anyway), it forces the rebalancing
> evenly between the mappers.
> >
> > Greetings,
> > Arnaud
> >
> >
> > -Message d'origine-
> > De : Matthias J. Sax [mailto:mj...@apache.org]
> > Envoyé : mercredi 2 septembre 2015 17:56
> > À : user@flink.apache.org
> > Objet : Re: How to force the parallelism on small streams?
> >
> > Hi,
> >
> > If I understand you correctly, you want to have 100 mappers. Thus you
> need to apply the .setParallelism() after .map()
> >
> >> addSource(myFileSource).rebalance().map(myFileMapper).setParallelism(1
> >> 00)
> >
> > The order of commands you used, set the dop for the source to 100 (which
> might be ignored, if the provided source function "myFileSource" does not
> implements "ParallelSourceFunction" interface). The dop for the mapper
> should be the default value.
> >
> > Using .rebalance() is absolutely correct. It distributes the emitted
> tuples in a round robin fashion to all consumer tasks.
> >
> > -Matthias
> >
> > On 09/02/2015 05:41 PM, LINZ, Arnaud wrote:
> >> Hi,
> >>
> >>
> >>
> >> I have a source that provides few items since it gives file names to
> >> the mappers. The mapper opens the file and process records. As the
> >> files are huge, one input line (a filename) gives a consequent work to
> the next stage.
> >>
> >> My topology looks like :
> >>
> >> addSource(myFileSource).rebalance().setParallelism(100).map(myFileMapp
> >> er)
> >>
> >> If 100 mappers are created, about 85 end immediately and only a few
> >> process the files (for hours). I suspect an optimization making that
> >> there is a minimum number of lines to pass to the next node or it is
> >> “shutdown” ; but in my case I do want the lines to be evenly
> >> distributed to each mapper.
> >>
> >> How to enforce that ?
> >>
> >>
> >>
> >> Greetings,
> >>
> >> Arnaud
> >>
> >>
> >> --
> >> --
> >>
> >> L'intégrité de ce message n'étant pas assurée sur internet, la société
> >> expéditrice ne peut être tenue responsable de son contenu ni de ses
> >> pièces jointes. Toute utilisation ou diffusion non autorisée est
> >> interdite. Si vous n'êtes pas destinataire de ce message, merci de le
> >> détruire et d'avertir l'expéditeur.
> >>
> >> The integrity of this message cannot be guaranteed on the Internet.
> >> The company that sent this message cannot therefore be held liable for
> >> its content nor attachments. Any unauthorized use or dissemination is
> >> prohibited. If you are not the intended recipient of this message,
> >> then please delete it and notify the sender.
> >
>
>


Re: How to force the parallelism on small streams?

2015-09-03 Thread Matthias J. Sax
If it would be only 14 elements, you are obviously right. However, if I
understood Arnaud correctly, the problem is, that there are more than 14
elements:

> Each of my 100 sources gives only a few lines (say 14 max)

That would be about 140 lines in total.

Using non-parallel source, he is able to distribute the elements to all
100 mappers. I assume that about 40 mappers receive 2 lines, and 60
receive 1 line.

@Arnaud: is this correct?


-Matthias

On 09/03/2015 03:04 PM, Aljoscha Krettek wrote:
> Hi,
> I don't think it's a bug. If there are 100 sources that each emit only
> 14 elements then only the first 14 mappers will ever receive data. The
> round-robin distribution is not global, since the sources operate
> independently from each other.
> 
> Cheers,
> Aljoscha
> 
> On Wed, 2 Sep 2015 at 20:00 Matthias J. Sax  > wrote:
> 
> Thanks for clarifying. shuffle() is similar to rebalance() -- however,
> it redistributes randomly and not in round robin fashion.
> 
> However, the problem you describe sounds like a bug to me. I included
> dev list. Maybe anyone else can step in so we can identify it there is a
> bug or not.
> 
> -Matthias
> 
> 
> On 09/02/2015 06:19 PM, LINZ, Arnaud wrote:
> > Hi,
> >
> > You are right, but in fact it does not solve my problem, since I
> have 100 parallelism everywhere. Each of my 100 sources gives only a
> few lines (say 14 max), and only the first 14 next nodes will
> receive data.
> > Same problem by replacing rebalance() with shuffle().
> >
> > But I found a workaround: setting parallelism to 1 for the source
> (I don't need a 100 directory scanners anyway), it forces the
> rebalancing evenly between the mappers.
> >
> > Greetings,
> > Arnaud
> >
> >
> > -Message d'origine-
> > De : Matthias J. Sax [mailto:mj...@apache.org
> ]
> > Envoyé : mercredi 2 septembre 2015 17:56
> > À : user@flink.apache.org 
> > Objet : Re: How to force the parallelism on small streams?
> >
> > Hi,
> >
> > If I understand you correctly, you want to have 100 mappers. Thus
> you need to apply the .setParallelism() after .map()
> >
> >>
> addSource(myFileSource).rebalance().map(myFileMapper).setParallelism(1
> >> 00)
> >
> > The order of commands you used, set the dop for the source to 100
> (which might be ignored, if the provided source function
> "myFileSource" does not implements "ParallelSourceFunction"
> interface). The dop for the mapper should be the default value.
> >
> > Using .rebalance() is absolutely correct. It distributes the
> emitted tuples in a round robin fashion to all consumer tasks.
> >
> > -Matthias
> >
> > On 09/02/2015 05:41 PM, LINZ, Arnaud wrote:
> >> Hi,
> >>
> >>
> >>
> >> I have a source that provides few items since it gives file names to
> >> the mappers. The mapper opens the file and process records. As the
> >> files are huge, one input line (a filename) gives a consequent
> work to the next stage.
> >>
> >> My topology looks like :
> >>
> >>
> addSource(myFileSource).rebalance().setParallelism(100).map(myFileMapp
> >> er)
> >>
> >> If 100 mappers are created, about 85 end immediately and only a few
> >> process the files (for hours). I suspect an optimization making that
> >> there is a minimum number of lines to pass to the next node or it is
> >> “shutdown” ; but in my case I do want the lines to be evenly
> >> distributed to each mapper.
> >>
> >> How to enforce that ?
> >>
> >>
> >>
> >> Greetings,
> >>
> >> Arnaud
> >>
> >>
> >>
> --
> >> --
> >>
> >> L'intégrité de ce message n'étant pas assurée sur internet, la
> société
> >> expéditrice ne peut être tenue responsable de son contenu ni de ses
> >> pièces jointes. Toute utilisation ou diffusion non autorisée est
> >> interdite. Si vous n'êtes pas destinataire de ce message, merci de le
> >> détruire et d'avertir l'expéditeur.
> >>
> >> The integrity of this message cannot be guaranteed on the Internet.
> >> The company that sent this message cannot therefore be held
> liable for
> >> its content nor attachments. Any unauthorized use or dissemination is
> >> prohibited. If you are not the intended recipient of this message,
> >> then please delete it and notify the sender.
> >
> 



signature.asc
Description: OpenPGP digital signature


Re: How to force the parallelism on small streams?

2015-09-03 Thread Fabian Hueske
Btw, it is working with a parallelism 1 source, because only a single
source partitions (round-robin or random) the data.
Several sources do not assign work to the same few mappers.

2015-09-03 15:22 GMT+02:00 Matthias J. Sax :

> If it would be only 14 elements, you are obviously right. However, if I
> understood Arnaud correctly, the problem is, that there are more than 14
> elements:
>
> > Each of my 100 sources gives only a few lines (say 14 max)
>
> That would be about 140 lines in total.
>
> Using non-parallel source, he is able to distribute the elements to all
> 100 mappers. I assume that about 40 mappers receive 2 lines, and 60
> receive 1 line.
>
> @Arnaud: is this correct?
>
>
> -Matthias
>
> On 09/03/2015 03:04 PM, Aljoscha Krettek wrote:
> > Hi,
> > I don't think it's a bug. If there are 100 sources that each emit only
> > 14 elements then only the first 14 mappers will ever receive data. The
> > round-robin distribution is not global, since the sources operate
> > independently from each other.
> >
> > Cheers,
> > Aljoscha
> >
> > On Wed, 2 Sep 2015 at 20:00 Matthias J. Sax  > > wrote:
> >
> > Thanks for clarifying. shuffle() is similar to rebalance() --
> however,
> > it redistributes randomly and not in round robin fashion.
> >
> > However, the problem you describe sounds like a bug to me. I included
> > dev list. Maybe anyone else can step in so we can identify it there
> is a
> > bug or not.
> >
> > -Matthias
> >
> >
> > On 09/02/2015 06:19 PM, LINZ, Arnaud wrote:
> > > Hi,
> > >
> > > You are right, but in fact it does not solve my problem, since I
> > have 100 parallelism everywhere. Each of my 100 sources gives only a
> > few lines (say 14 max), and only the first 14 next nodes will
> > receive data.
> > > Same problem by replacing rebalance() with shuffle().
> > >
> > > But I found a workaround: setting parallelism to 1 for the source
> > (I don't need a 100 directory scanners anyway), it forces the
> > rebalancing evenly between the mappers.
> > >
> > > Greetings,
> > > Arnaud
> > >
> > >
> > > -Message d'origine-
> > > De : Matthias J. Sax [mailto:mj...@apache.org
> > ]
> > > Envoyé : mercredi 2 septembre 2015 17:56
> > > À : user@flink.apache.org 
> > > Objet : Re: How to force the parallelism on small streams?
> > >
> > > Hi,
> > >
> > > If I understand you correctly, you want to have 100 mappers. Thus
> > you need to apply the .setParallelism() after .map()
> > >
> > >>
> >
>  addSource(myFileSource).rebalance().map(myFileMapper).setParallelism(1
> > >> 00)
> > >
> > > The order of commands you used, set the dop for the source to 100
> > (which might be ignored, if the provided source function
> > "myFileSource" does not implements "ParallelSourceFunction"
> > interface). The dop for the mapper should be the default value.
> > >
> > > Using .rebalance() is absolutely correct. It distributes the
> > emitted tuples in a round robin fashion to all consumer tasks.
> > >
> > > -Matthias
> > >
> > > On 09/02/2015 05:41 PM, LINZ, Arnaud wrote:
> > >> Hi,
> > >>
> > >>
> > >>
> > >> I have a source that provides few items since it gives file names
> to
> > >> the mappers. The mapper opens the file and process records. As the
> > >> files are huge, one input line (a filename) gives a consequent
> > work to the next stage.
> > >>
> > >> My topology looks like :
> > >>
> > >>
> >
>  addSource(myFileSource).rebalance().setParallelism(100).map(myFileMapp
> > >> er)
> > >>
> > >> If 100 mappers are created, about 85 end immediately and only a
> few
> > >> process the files (for hours). I suspect an optimization making
> that
> > >> there is a minimum number of lines to pass to the next node or it
> is
> > >> “shutdown” ; but in my case I do want the lines to be evenly
> > >> distributed to each mapper.
> > >>
> > >> How to enforce that ?
> > >>
> > >>
> > >>
> > >> Greetings,
> > >>
> > >> Arnaud
> > >>
> > >>
> > >>
> >
>  --
> > >> --
> > >>
> > >> L'intégrité de ce message n'étant pas assurée sur internet, la
> > société
> > >> expéditrice ne peut être tenue responsable de son contenu ni de
> ses
> > >> pièces jointes. Toute utilisation ou diffusion non autorisée est
> > >> interdite. Si vous n'êtes pas destinataire de ce message, merci
> de le
> > >> détruire et d'avertir l'expéditeur.
> > >>
> > >> The integrity of this message cannot be guaranteed on the
> Internet.
> > >> The company that sent this message cannot therefore be held
> > liable for
> > >> its content nor atta

Re: How to force the parallelism on small streams?

2015-09-03 Thread Fabian Hueske
In case of rebalance(), all sources start the round-robin partitioning at
index 0. Since each source emits only very few elements, only the first 15
mappers receive any input.
It would be better to let each source start the round-robin partitioning at
a different index, something like startIdx = (numReceivers / numSenders) *
myIdx.

In case of shuffle(), the ShufflePartitioner initializes Random() without a
seed (the current time is taken).
However, the ShufflePartitioner is only initialized once at the client side
(if I see that correctly) and then the same instance is deserialized by all
operators, i.e., all use random number generators with the same seed.

I think, the StreamPartitioner class should be extended with a
configuration / initialize method which is called on each parallel operator.

Btw, it is working with a parallelism 1 source, because only a single
source partitions (round-robin or random) the data.
Several sources do not assign work to the same few mappers.

Cheers, Fabian

2015-09-03 15:22 GMT+02:00 Matthias J. Sax :

> If it would be only 14 elements, you are obviously right. However, if I
> understood Arnaud correctly, the problem is, that there are more than 14
> elements:
>
> > Each of my 100 sources gives only a few lines (say 14 max)
>
> That would be about 140 lines in total.
>
> Using non-parallel source, he is able to distribute the elements to all
> 100 mappers. I assume that about 40 mappers receive 2 lines, and 60
> receive 1 line.
>
> @Arnaud: is this correct?
>
>
> -Matthias
>
> On 09/03/2015 03:04 PM, Aljoscha Krettek wrote:
> > Hi,
> > I don't think it's a bug. If there are 100 sources that each emit only
> > 14 elements then only the first 14 mappers will ever receive data. The
> > round-robin distribution is not global, since the sources operate
> > independently from each other.
> >
> > Cheers,
> > Aljoscha
> >
> > On Wed, 2 Sep 2015 at 20:00 Matthias J. Sax  > > wrote:
> >
> > Thanks for clarifying. shuffle() is similar to rebalance() --
> however,
> > it redistributes randomly and not in round robin fashion.
> >
> > However, the problem you describe sounds like a bug to me. I included
> > dev list. Maybe anyone else can step in so we can identify it there
> is a
> > bug or not.
> >
> > -Matthias
> >
> >
> > On 09/02/2015 06:19 PM, LINZ, Arnaud wrote:
> > > Hi,
> > >
> > > You are right, but in fact it does not solve my problem, since I
> > have 100 parallelism everywhere. Each of my 100 sources gives only a
> > few lines (say 14 max), and only the first 14 next nodes will
> > receive data.
> > > Same problem by replacing rebalance() with shuffle().
> > >
> > > But I found a workaround: setting parallelism to 1 for the source
> > (I don't need a 100 directory scanners anyway), it forces the
> > rebalancing evenly between the mappers.
> > >
> > > Greetings,
> > > Arnaud
> > >
> > >
> > > -Message d'origine-
> > > De : Matthias J. Sax [mailto:mj...@apache.org
> > ]
> > > Envoyé : mercredi 2 septembre 2015 17:56
> > > À : user@flink.apache.org 
> > > Objet : Re: How to force the parallelism on small streams?
> > >
> > > Hi,
> > >
> > > If I understand you correctly, you want to have 100 mappers. Thus
> > you need to apply the .setParallelism() after .map()
> > >
> > >>
> >
>  addSource(myFileSource).rebalance().map(myFileMapper).setParallelism(1
> > >> 00)
> > >
> > > The order of commands you used, set the dop for the source to 100
> > (which might be ignored, if the provided source function
> > "myFileSource" does not implements "ParallelSourceFunction"
> > interface). The dop for the mapper should be the default value.
> > >
> > > Using .rebalance() is absolutely correct. It distributes the
> > emitted tuples in a round robin fashion to all consumer tasks.
> > >
> > > -Matthias
> > >
> > > On 09/02/2015 05:41 PM, LINZ, Arnaud wrote:
> > >> Hi,
> > >>
> > >>
> > >>
> > >> I have a source that provides few items since it gives file names
> to
> > >> the mappers. The mapper opens the file and process records. As the
> > >> files are huge, one input line (a filename) gives a consequent
> > work to the next stage.
> > >>
> > >> My topology looks like :
> > >>
> > >>
> >
>  addSource(myFileSource).rebalance().setParallelism(100).map(myFileMapp
> > >> er)
> > >>
> > >> If 100 mappers are created, about 85 end immediately and only a
> few
> > >> process the files (for hours). I suspect an optimization making
> that
> > >> there is a minimum number of lines to pass to the next node or it
> is
> > >> “shutdown” ; but in my case I do want the lines to be evenly
> > >> distributed to each mapper.
> > >>
> > >> How to 

Re: How to force the parallelism on small streams?

2015-09-03 Thread Fabian Hueske
The purpose of rebalance() should be to rebalance the partitions of a data
streams as evenly as possible, right?
If all senders start sending data to the same receiver and there is less
data in each partition than receivers, partitions are not evenly rebalanced.
That is exactly the problem Arnaud ran into.

IMO, that's a bug and should be fixed.

2015-09-03 15:53 GMT+02:00 Matthias J. Sax :

> For rebalance() this makes sense. I don't think anything must be
> changed. For regular data, there is no such issues as for this very
> small data set.
>
> However for shuffle() I would expect that each source task uses a
> different shuffle pattern...
>
> -Matthias
>
> On 09/03/2015 03:28 PM, Fabian Hueske wrote:
> > In case of rebalance(), all sources start the round-robin partitioning at
> > index 0. Since each source emits only very few elements, only the first
> 15
> > mappers receive any input.
> > It would be better to let each source start the round-robin partitioning
> at
> > a different index, something like startIdx = (numReceivers / numSenders)
> *
> > myIdx.
> >
> > In case of shuffle(), the ShufflePartitioner initializes Random()
> without a
> > seed (the current time is taken).
> > However, the ShufflePartitioner is only initialized once at the client
> side
> > (if I see that correctly) and then the same instance is deserialized by
> all
> > operators, i.e., all use random number generators with the same seed.
> >
> > I think, the StreamPartitioner class should be extended with a
> > configuration / initialize method which is called on each parallel
> operator.
> >
> > Cheers, Fabian
> >
> > 2015-09-03 15:04 GMT+02:00 Aljoscha Krettek :
> >
> >> Hi,
> >> I don't think it's a bug. If there are 100 sources that each emit only
> 14
> >> elements then only the first 14 mappers will ever receive data. The
> >> round-robin distribution is not global, since the sources operate
> >> independently from each other.
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >> On Wed, 2 Sep 2015 at 20:00 Matthias J. Sax  wrote:
> >>
> >>> Thanks for clarifying. shuffle() is similar to rebalance() -- however,
> >>> it redistributes randomly and not in round robin fashion.
> >>>
> >>> However, the problem you describe sounds like a bug to me. I included
> >>> dev list. Maybe anyone else can step in so we can identify it there is
> a
> >>> bug or not.
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 09/02/2015 06:19 PM, LINZ, Arnaud wrote:
>  Hi,
> 
>  You are right, but in fact it does not solve my problem, since I have
> >>> 100 parallelism everywhere. Each of my 100 sources gives only a few
> lines
> >>> (say 14 max), and only the first 14 next nodes will receive data.
>  Same problem by replacing rebalance() with shuffle().
> 
>  But I found a workaround: setting parallelism to 1 for the source (I
> >>> don't need a 100 directory scanners anyway), it forces the rebalancing
> >>> evenly between the mappers.
> 
>  Greetings,
>  Arnaud
> 
> 
>  -Message d'origine-
>  De : Matthias J. Sax [mailto:mj...@apache.org]
>  Envoyé : mercredi 2 septembre 2015 17:56
>  À : user@flink.apache.org
>  Objet : Re: How to force the parallelism on small streams?
> 
>  Hi,
> 
>  If I understand you correctly, you want to have 100 mappers. Thus you
> >>> need to apply the .setParallelism() after .map()
> 
> >
> addSource(myFileSource).rebalance().map(myFileMapper).setParallelism(1
> > 00)
> 
>  The order of commands you used, set the dop for the source to 100
> >> (which
> >>> might be ignored, if the provided source function "myFileSource" does
> not
> >>> implements "ParallelSourceFunction" interface). The dop for the mapper
> >>> should be the default value.
> 
>  Using .rebalance() is absolutely correct. It distributes the emitted
> >>> tuples in a round robin fashion to all consumer tasks.
> 
>  -Matthias
> 
>  On 09/02/2015 05:41 PM, LINZ, Arnaud wrote:
> > Hi,
> >
> >
> >
> > I have a source that provides few items since it gives file names to
> > the mappers. The mapper opens the file and process records. As the
> > files are huge, one input line (a filename) gives a consequent work
> to
> >>> the next stage.
> >
> > My topology looks like :
> >
> >
> addSource(myFileSource).rebalance().setParallelism(100).map(myFileMapp
> > er)
> >
> > If 100 mappers are created, about 85 end immediately and only a few
> > process the files (for hours). I suspect an optimization making that
> > there is a minimum number of lines to pass to the next node or it is
> > “shutdown” ; but in my case I do want the lines to be evenly
> > distributed to each mapper.
> >
> > How to enforce that ?
> >
> >
> >
> > Greetings,
> >
> > Arnaud
> >
> >
> >
> --
> > --

Re: when use broadcast variable and run on bigdata display this error please help

2015-09-03 Thread Stephan Ewen
Chiwan has a good point. Once the data that needs to be available to all
machines is too large for one machine, there is no good solution any more.
The best approach is an external store to which all nodes have access. It
is not going to be terribly fast, though.

If you are in the situation that you need to broadcast so much data, you
may want to rethink your approach to the problem in the first place. Is
there no solution that can work with partitioned data? Even at the cost of
re-partitioning twice or so?

On Thu, Sep 3, 2015 at 10:35 AM, Chiwan Park  wrote:

> Hi hagersaleh,
>
> Sorry for late reply.
>
> I think using an external system could be a solution for large scale data.
> To use an external system, you have to implement rich functions such as
> RichFilterFunction, RichMapFunction, …, etc.
>
>
> Regards,
> Chiwan Park
>
>
> > On Aug 30, 2015, at 1:30 AM, hagersaleh  wrote:
> >
> > where are any ways for use broadcast variable with bigdata
> >
> >
> >
> > --
> > View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/when-use-broadcast-variable-and-run-on-bigdata-display-this-error-please-help-tp2455p2566.html
> > Sent from the Apache Flink User Mailing List archive. mailing list
> archive at Nabble.com.
>
>
>
>
>


Re: Usage of Hadoop 2.2.0

2015-09-03 Thread Robert Metzger
I think most cloud providers moved beyond Hadoop 2.2.0.
Google's Click-To-Deploy is on 2.4.1
AWS EMR is on 2.6.0

The situation for the distributions seems to be the following:
MapR 4 uses Hadoop 2.4.0 (current is MapR 5)
CDH 5.0 uses 2.3.0 (the current CDH release is 5.4)

HDP 2.0  (October 2013) is using 2.2.0
HDP 2.1 (April 2014) uses 2.4.0 already

So both vendors and cloud providers are multiple releases away from Hadoop
2.2.0.

Spark does not offer a binary distribution lower than 2.3.0.

In addition to that, I don't think that the HDFS client in 2.2.0 is really
usable in production environments. Users were reporting
ArrayIndexOutOfBounds exceptions for some jobs, I also had these exceptions
sometimes.

The easiest approach  to resolve this issue would be  (a) dropping the
support for Hadoop 2.2.0
An alternative approach (b) would be:
 - ship a binary version for Hadoop 2.3.0
 - make the source of Flink still compatible with 2.2.0, so that users can
compile a Hadoop 2.2.0 version if needed.

I would vote for approach (a).


On Tue, Sep 1, 2015 at 5:01 PM, Till Rohrmann  wrote:

> While working on high availability (HA) for Flink's YARN execution I
> stumbled across some limitations with Hadoop 2.2.0. From version 2.2.0 to
> 2.3.0, Hadoop introduced new functionality which is required for an
> efficient HA implementation. Therefore, I was wondering whether there is
> actually a need to support Hadoop 2.2.0. Is Hadoop 2.2.0 still actively
> used by someone?
>
> Cheers,
> Till
>


Re: Hardware requirements and learning resources

2015-09-03 Thread Kostas Tzoumas
Well hidden.

I added now a link at the menu of http://data-artisans.com/. This material
is provided for free by data Artisans but they are not part of the official
Apache Flink project.

On Thu, Sep 3, 2015 at 2:20 PM, Stefan Winterstein <
stefan.winterst...@dfki.de> wrote:

>
> > Answering to myself, I have found some nice training material at
> > http://dataartisans.github.io/flink-training.
>
> Excellent resources! Somehow, I managed not to stumble over them by
> myself - either I was blind, or they are well hidden... :)
>
>
> Best,
> -Stefan
>
>


Re: when use broadcast variable and run on bigdata display this error please help

2015-09-03 Thread hagersaleh
Hi Chiwan Park 
not understand this solution please explain more



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/when-use-broadcast-variable-and-run-on-bigdata-display-this-error-please-help-tp2455p2676.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Question on flink and hdfs

2015-09-03 Thread Jerry Peng
Hello,

Does flink require hdfs to run? I know you can use hdfs to checkpoint and
process files in a distributed fashion.  So can flink run standalone
without hdfs?


Re: Question on flink and hdfs

2015-09-03 Thread Márton Balassi
Hi Jerry,

Yes, you can.

Best,

Marton

On Thu, Sep 3, 2015 at 8:57 PM, Jerry Peng 
wrote:

> Hello,
>
> Does flink require hdfs to run? I know you can use hdfs to checkpoint and
> process files in a distributed fashion.  So can flink run standalone
> without hdfs?
>


Re: Question on flink and hdfs

2015-09-03 Thread Stephan Ewen
Hi!

Yes, you can run Flink completely without HDFS.

Also the checkpointing can put state into any file system, like S3, or a
Unix file system (like a NAS or Amazon EBS), or even Tachyon.

Greetings,
Stephan


On Thu, Sep 3, 2015 at 8:57 PM, Jerry Peng 
wrote:

> Hello,
>
> Does flink require hdfs to run? I know you can use hdfs to checkpoint and
> process files in a distributed fashion.  So can flink run standalone
> without hdfs?
>


Re: Multiple restarts of Local Cluster

2015-09-03 Thread Stephan Ewen
Stopping the JVM process clean up all resources, except temp files.

Everything that creates temp files uses a shutdown hook to remove these:
IOManager, BlobManager, LibraryCache, ...

On Wed, Sep 2, 2015 at 7:40 PM, Sachin Goel 
wrote:

> I'm not sure what you mean by "Crucial cleanup is in shutdown hooks".
> Could you elaborate?
>
> -- Sachin Goel
> Computer Science, IIT Delhi
> m. +91-9871457685
>
> On Wed, Sep 2, 2015 at 10:25 PM, Stephan Ewen  wrote:
>
>> You can always shut down a cluster manually (via shutdown()) and if the
>> JVM simply exists, all is well as well. Crucial cleanup is in shutdown
>> hooks.
>>
>> On Wed, Sep 2, 2015 at 6:22 PM, Till Rohrmann 
>> wrote:
>>
>>> If I'm not mistaken, then the cluster should be properly terminated when
>>> it gets garbage collected. Thus, also when the main method exits.
>>>
>>> On Wed, Sep 2, 2015 at 6:14 PM, Sachin Goel 
>>> wrote:
>>>
 If I'm right, all Tests use either the MultipleProgramTestBase or
 JavaProgramTestBase​. Those shut down the cluster explicitly anyway.
 I will make sure if this is the case.

 Regards
 Sachin

 -- Sachin Goel
 Computer Science, IIT Delhi
 m. +91-9871457685

 On Wed, Sep 2, 2015 at 9:40 PM, Till Rohrmann 
 wrote:

> Maybe we can create a single PlanExecutor for the LocalEnvironment
> which is used when calling execute. This of course entails that we
> don’t call stop on the LocalCluster. For cases where the program
> exits after calling execute, this should be fine because all resources 
> will
> then be released anyway. It might matter for the test execution where 
> maven
> reuses the JVMs and where the LocalFlinkMiniCluster won’t be garbage
> collected right away. You could try it out and see what happens.
>
> Cheers,
> Till
> ​
>
> On Wed, Sep 2, 2015 at 6:03 PM, Till Rohrmann 
> wrote:
>
>> Oh sorry, then I got the wrong context. I somehow thought it was
>> about test cases because I read `MultipleProgramTestBase` etc. Sorry my 
>> bad.
>>
>> On Wed, Sep 2, 2015 at 6:00 PM, Sachin Goel > > wrote:
>>
>>> I was under the impression that the @AfterClass annotation can only
>>> be used in test classes.
>>> Even so, the idea is that a user program running in the IDE should
>>> not be starting up the cluster several times [my primary concern is the
>>> addition of the persist operator], and we certainly cannot ask the user 
>>> to
>>> terminate the cluster after execution, while in local mode.
>>>
>>> -- Sachin Goel
>>> Computer Science, IIT Delhi
>>> m. +91-9871457685
>>>
>>> On Wed, Sep 2, 2015 at 9:19 PM, Till Rohrmann 
>>> wrote:
>>>
 Why is it not possible to shut down the local cluster? Can’t you
 shut it down in the @AfterClass method?
 ​

 On Wed, Sep 2, 2015 at 4:56 PM, Sachin Goel <
 sachingoel0...@gmail.com> wrote:

> Yes. That will work too. However, then it isn't possible to shut
> down the local cluster. [Is it necessary to do so or does it shut down
> automatically when the program exists? I'm not entirely sure.]
>
> -- Sachin Goel
> Computer Science, IIT Delhi
> m. +91-9871457685
>
> On Wed, Sep 2, 2015 at 7:59 PM, Stephan Ewen 
> wrote:
>
>> Have a look at some other tests, like the checkpointing tests.
>> They start one cluster manually and keep it running. They connect 
>> against
>> it using the remote environment ("localhost",
>> miniCluster.getJobManagerRpcPort()).
>>
>> That works nicely...
>>
>> On Wed, Sep 2, 2015 at 4:23 PM, Sachin Goel <
>> sachingoel0...@gmail.com> wrote:
>>
>>> Hi all
>>> While using LocalEnvironment, in case the program triggers
>>> execution several times, the {{LocalFlinkMiniCluster}} is started 
>>> as many
>>> times. This can consume a lot of time in setting up and tearing 
>>> down the
>>> cluster. Further, this hinders with a new functionality I'm working 
>>> on
>>> based on persisted results.
>>> One potential solution could be to follow the methodology in
>>> `MultipleProgramsTestBase`. The user code then would have to reside 
>>> in a
>>> fixed function name, instead of the main method. Or is that too 
>>> cumbersome?
>>>
>>> Regards
>>> Sachin
>>> -- Sachin Goel
>>> Computer Science, IIT Delhi
>>> m. +91-9871457685
>>>
>>
>>
>

>>>
>>
>

>>>
>>
>


Re: Multiple restarts of Local Cluster

2015-09-03 Thread Stephan Ewen
Have a look at the class IOManager and IOManagerAsync, it is a good example
of how we use these hooks for cleanup.

The constructor usually installs them, and the shutdown logic removes them.

On Thu, Sep 3, 2015 at 9:19 PM, Stephan Ewen  wrote:

> Stopping the JVM process clean up all resources, except temp files.
>
> Everything that creates temp files uses a shutdown hook to remove these:
> IOManager, BlobManager, LibraryCache, ...
>
> On Wed, Sep 2, 2015 at 7:40 PM, Sachin Goel 
> wrote:
>
>> I'm not sure what you mean by "Crucial cleanup is in shutdown hooks".
>> Could you elaborate?
>>
>> -- Sachin Goel
>> Computer Science, IIT Delhi
>> m. +91-9871457685
>>
>> On Wed, Sep 2, 2015 at 10:25 PM, Stephan Ewen  wrote:
>>
>>> You can always shut down a cluster manually (via shutdown()) and if the
>>> JVM simply exists, all is well as well. Crucial cleanup is in shutdown
>>> hooks.
>>>
>>> On Wed, Sep 2, 2015 at 6:22 PM, Till Rohrmann 
>>> wrote:
>>>
 If I'm not mistaken, then the cluster should be properly terminated
 when it gets garbage collected. Thus, also when the main method exits.

 On Wed, Sep 2, 2015 at 6:14 PM, Sachin Goel 
 wrote:

> If I'm right, all Tests use either the MultipleProgramTestBase or
> JavaProgramTestBase​. Those shut down the cluster explicitly anyway.
> I will make sure if this is the case.
>
> Regards
> Sachin
>
> -- Sachin Goel
> Computer Science, IIT Delhi
> m. +91-9871457685
>
> On Wed, Sep 2, 2015 at 9:40 PM, Till Rohrmann 
> wrote:
>
>> Maybe we can create a single PlanExecutor for the LocalEnvironment
>> which is used when calling execute. This of course entails that we
>> don’t call stop on the LocalCluster. For cases where the program
>> exits after calling execute, this should be fine because all resources 
>> will
>> then be released anyway. It might matter for the test execution where 
>> maven
>> reuses the JVMs and where the LocalFlinkMiniCluster won’t be garbage
>> collected right away. You could try it out and see what happens.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Wed, Sep 2, 2015 at 6:03 PM, Till Rohrmann 
>> wrote:
>>
>>> Oh sorry, then I got the wrong context. I somehow thought it was
>>> about test cases because I read `MultipleProgramTestBase` etc. Sorry my 
>>> bad.
>>>
>>> On Wed, Sep 2, 2015 at 6:00 PM, Sachin Goel <
>>> sachingoel0...@gmail.com> wrote:
>>>
 I was under the impression that the @AfterClass annotation can only
 be used in test classes.
 Even so, the idea is that a user program running in the IDE should
 not be starting up the cluster several times [my primary concern is the
 addition of the persist operator], and we certainly cannot ask the 
 user to
 terminate the cluster after execution, while in local mode.

 -- Sachin Goel
 Computer Science, IIT Delhi
 m. +91-9871457685

 On Wed, Sep 2, 2015 at 9:19 PM, Till Rohrmann >>> > wrote:

> Why is it not possible to shut down the local cluster? Can’t you
> shut it down in the @AfterClass method?
> ​
>
> On Wed, Sep 2, 2015 at 4:56 PM, Sachin Goel <
> sachingoel0...@gmail.com> wrote:
>
>> Yes. That will work too. However, then it isn't possible to shut
>> down the local cluster. [Is it necessary to do so or does it shut 
>> down
>> automatically when the program exists? I'm not entirely sure.]
>>
>> -- Sachin Goel
>> Computer Science, IIT Delhi
>> m. +91-9871457685
>>
>> On Wed, Sep 2, 2015 at 7:59 PM, Stephan Ewen 
>> wrote:
>>
>>> Have a look at some other tests, like the checkpointing tests.
>>> They start one cluster manually and keep it running. They connect 
>>> against
>>> it using the remote environment ("localhost",
>>> miniCluster.getJobManagerRpcPort()).
>>>
>>> That works nicely...
>>>
>>> On Wed, Sep 2, 2015 at 4:23 PM, Sachin Goel <
>>> sachingoel0...@gmail.com> wrote:
>>>
 Hi all
 While using LocalEnvironment, in case the program triggers
 execution several times, the {{LocalFlinkMiniCluster}} is started 
 as many
 times. This can consume a lot of time in setting up and tearing 
 down the
 cluster. Further, this hinders with a new functionality I'm 
 working on
 based on persisted results.
 One potential solution could be to follow the methodology in
 `MultipleProgramsTestBase`. The user code then would have to 
 reside in a
 fixed function name, instead of the main method. Or is that too 
 cumbe

Re: Usage of Hadoop 2.2.0

2015-09-03 Thread Ufuk Celebi
+1 to what Robert said.

On Thursday, September 3, 2015, Robert Metzger  wrote:

> I think most cloud providers moved beyond Hadoop 2.2.0.
> Google's Click-To-Deploy is on 2.4.1
> AWS EMR is on 2.6.0
>
> The situation for the distributions seems to be the following:
> MapR 4 uses Hadoop 2.4.0 (current is MapR 5)
> CDH 5.0 uses 2.3.0 (the current CDH release is 5.4)
>
> HDP 2.0  (October 2013) is using 2.2.0
> HDP 2.1 (April 2014) uses 2.4.0 already
>
> So both vendors and cloud providers are multiple releases away from Hadoop
> 2.2.0.
>
> Spark does not offer a binary distribution lower than 2.3.0.
>
> In addition to that, I don't think that the HDFS client in 2.2.0 is really
> usable in production environments. Users were reporting
> ArrayIndexOutOfBounds exceptions for some jobs, I also had these exceptions
> sometimes.
>
> The easiest approach  to resolve this issue would be  (a) dropping the
> support for Hadoop 2.2.0
> An alternative approach (b) would be:
>  - ship a binary version for Hadoop 2.3.0
>  - make the source of Flink still compatible with 2.2.0, so that users can
> compile a Hadoop 2.2.0 version if needed.
>
> I would vote for approach (a).
>
>
> On Tue, Sep 1, 2015 at 5:01 PM, Till Rohrmann  > wrote:
>
>> While working on high availability (HA) for Flink's YARN execution I
>> stumbled across some limitations with Hadoop 2.2.0. From version 2.2.0 to
>> 2.3.0, Hadoop introduced new functionality which is required for an
>> efficient HA implementation. Therefore, I was wondering whether there is
>> actually a need to support Hadoop 2.2.0. Is Hadoop 2.2.0 still actively
>> used by someone?
>>
>> Cheers,
>> Till
>>
>
>


Re: what different between join and coGroup in flink

2015-09-03 Thread Fabian Hueske
CoGroup is more generic than Join. You can perform a Join with CoGroup but
not do a CoGroup with a Join.
However, Join can be executed more efficiently than CoGroup.

2015-09-03 22:28 GMT+02:00 hagersaleh :

> what different between join and coGroup in flink
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/what-different-between-join-and-coGroup-in-flink-tp2682.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Efficiency for Filter then Transform ( filter().map() vs flatMap() )

2015-09-03 Thread Welly Tambunan
Hi Stephan,

That's good information to know. We will hit that throughput easily. Our
computation graph has lot of chaining like this right now.
I think it's safe to minimize the chain right now.

Thanks a lot for this Stephan.

Cheers

On Thu, Sep 3, 2015 at 7:20 PM, Stephan Ewen  wrote:

> In a set of benchmarks a while back, we found that the chaining mechanism
> has some overhead right now, because of its abstraction. The abstraction
> creates iterators for each element and makes it hard for the JIT to
> specialize on the operators in the chain.
>
> For purely local chains at full speed, this overhead is observable (can
> decrease throughput from 25mio elements/core to 15-20mio elements per
> core). If your job does not reach that throughput, or is I/O bound, source
> bound, etc, it does not matter.
>
> If you care about super high performance, collapsing the code into one
> function helps.
>
> On Thu, Sep 3, 2015 at 5:59 AM, Welly Tambunan  wrote:
>
>> Hi Gyula,
>>
>> Thanks for your response. Seems i will use filter and map for now as that
>> one is really make the intention clear, and not a big performance hit.
>>
>> Thanks again.
>>
>> Cheers
>>
>> On Thu, Sep 3, 2015 at 10:29 AM, Gyula Fóra  wrote:
>>
>>> Hey Welly,
>>>
>>> If you call filter and map one after the other like you mentioned, these
>>> operators will be chained and executed as if they were running in the same
>>> operator.
>>> The only small performance overhead comes from the fact that the output
>>> of the filter will be copied before passing it as input to the map to keep
>>> immutability guarantees (but no serialization/deserialization will happen).
>>> Copying might be practically free depending on your data type, though.
>>>
>>> If you are using operators that don't make use of the immutability of
>>> inputs/outputs (i.e you don't hold references to those values) than you can
>>> disable copying altogether by calling env.getConfig().enableObjectReuse(),
>>> in which case they will have exactly the same performance.
>>>
>>> Cheers,
>>> Gyula
>>>
>>> Welly Tambunan  ezt írta (időpont: 2015. szept. 3.,
>>> Cs, 4:33):
>>>
 Hi All,

 I would like to filter some item from the event stream. I think there
 are two ways doing this.

 Using the regular pipeline filter(...).map(...). We can also use
 flatMap for doing both in the same operator.

 Any performance improvement if we are using flatMap ? As that will be
 done in one operator instance.


 Cheers


 --
 Welly Tambunan
 Triplelands

 http://weltam.wordpress.com
 http://www.triplelands.com 

>>>
>>
>>
>> --
>> Welly Tambunan
>> Triplelands
>>
>> http://weltam.wordpress.com
>> http://www.triplelands.com 
>>
>
>


-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Re: Usage of Hadoop 2.2.0

2015-09-03 Thread Chiwan Park
+1 for dropping Hadoop 2.2.0

Regards,
Chiwan Park

> On Sep 4, 2015, at 5:58 AM, Ufuk Celebi  wrote:
> 
> +1 to what Robert said.
> 
> On Thursday, September 3, 2015, Robert Metzger  wrote:
> I think most cloud providers moved beyond Hadoop 2.2.0.
> Google's Click-To-Deploy is on 2.4.1
> AWS EMR is on 2.6.0
> 
> The situation for the distributions seems to be the following:
> MapR 4 uses Hadoop 2.4.0 (current is MapR 5)
> CDH 5.0 uses 2.3.0 (the current CDH release is 5.4)
> 
> HDP 2.0  (October 2013) is using 2.2.0
> HDP 2.1 (April 2014) uses 2.4.0 already
> 
> So both vendors and cloud providers are multiple releases away from Hadoop 
> 2.2.0.
> 
> Spark does not offer a binary distribution lower than 2.3.0.
> 
> In addition to that, I don't think that the HDFS client in 2.2.0 is really 
> usable in production environments. Users were reporting ArrayIndexOutOfBounds 
> exceptions for some jobs, I also had these exceptions sometimes.
> 
> The easiest approach  to resolve this issue would be  (a) dropping the 
> support for Hadoop 2.2.0
> An alternative approach (b) would be:
>  - ship a binary version for Hadoop 2.3.0
>  - make the source of Flink still compatible with 2.2.0, so that users can 
> compile a Hadoop 2.2.0 version if needed.
> 
> I would vote for approach (a).
> 
> 
> On Tue, Sep 1, 2015 at 5:01 PM, Till Rohrmann  wrote:
> While working on high availability (HA) for Flink's YARN execution I stumbled 
> across some limitations with Hadoop 2.2.0. From version 2.2.0 to 2.3.0, 
> Hadoop introduced new functionality which is required for an efficient HA 
> implementation. Therefore, I was wondering whether there is actually a need 
> to support Hadoop 2.2.0. Is Hadoop 2.2.0 still actively used by someone?
> 
> Cheers,
> Till
>