Issue with running Flink Python jobs on cluster

2016-07-12 Thread Geoffrey Mon
Hello all,

I've set up Flink on a very small cluster of one master node and five
worker nodes, following the instructions in the documentation (
https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html).
I can run the included examples like WordCount and PageRank across the
entire cluster, but when I try to run simple Python examples, I sometimes
get a strange error on the first PythonMapPartition about the temporary
folders that contain the streams of data between Python and Java.

If I run jobs on only the taskmanager on the master node, Python examples
run fine. However, if the jobs use the worker nodes, then I get the
following error:

org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Job execution failed.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)

Caused by: java.lang.Exception: The user defined 'open()' method caused an
exception: External process for task MapPartition (PythonMap) terminated
prematurely.
python: can't open file
'/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py':
[Errno 2] No such file or directory
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)

Caused by: java.lang.RuntimeException: External process for task
MapPartition (PythonMap) terminated prematurely.
python: can't open file
'/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py':
[Errno 2] No such file or directory
at
org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
at
org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
at
org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
... 5 more

I'm suspecting this issue has something to do with the data sending between
the master and the workers, but I haven't been able to find any solutions.
Presumably the temporary files weren't received properly and thus were not
created properly?

Thanks in advance.

Cheers,
Geoffrey


Re: Parameters to Control Intra-node Parallelism

2016-07-12 Thread Saliya Ekanayake
Hi Ovidiu,

Checking the /var/log/messages based on Greg's response revealed TMs were
killed due to out of memory. Here's the node architecture. Each node has
128GB of RAM. I was trying to run 2 TMs per node binding each to 12 cores
(or 1 socket). The total number of nodes were 16. I finally, managed to get
it working with the following (non-default) settings.

taskmanager.heap.mb: 12288
taskmanager.numberOfTaskSlots: 12
akka.ask.timeout: 1000 s
taskmanager.network.numberOfBuffers: 36864

Note, the number of buffers value, this had to be higher (twice in this
case) than what's suggested in Flink (#slots-per-TM^2 * #TMs * 4, which
would be 12*12*32*4 = 18432). Otherwise, it would throw me the not enough
buffers error.

Thank you,
Saliya

[image: Inline image 2]

On Tue, Jul 12, 2016 at 7:39 AM, Ovidiu-Cristian MARCU <
ovidiu-cristian.ma...@inria.fr> wrote:

> Hi,
>
> Can you post your configuration parameters (exclude default settings) and
> cluster description?
>
> Best,
> Ovidiu
>
> On 11 Jul 2016, at 17:49, Saliya Ekanayake  wrote:
>
> Thank you Greg, I'll check if this was the cause for my TMs to disappear.
>
> On Mon, Jul 11, 2016 at 11:34 AM, Greg Hogan  wrote:
>
>> The OOM killer doesn't give warning so you'll need to call dmesg or look
>> in /var/log/messages or similar. The following reports that Debian flavors
>> may use /var/log/syslog.
>>
>> http://stackoverflow.com/questions/624857/finding-which-process-was-killed-by-linux-oom-killer
>>
>> On Sun, Jul 10, 2016 at 11:55 PM, Saliya Ekanayake 
>> wrote:
>>
>>> Greg,
>>>
>>> where did you see the OOM log as shown in this mail thread? In my case
>>> none of the TaskManagers nor JobManger reports an error like this.
>>>
>>> On Sun, Jul 10, 2016 at 8:45 PM, Greg Hogan  wrote:
>>>
 These symptoms sounds similar to what I was experiencing in the
 following thread. Flink can have some unexpected memory usage which can
 result in an OOM kill by the kernel, and this becomes more pronounced as
 the cluster size grows.
   https://www.mail-archive.com/dev@flink.apache.org/msg06346.html

 On Fri, Jul 8, 2016 at 12:46 PM, Saliya Ekanayake 
 wrote:

> I checked, but JVMs didn't crash. No puppet or other services like
> that.
>
> One thing I found is that things work OK when I have a smaller number
> of slaves. For example, here I was trying to run on 16 nodes giving 2 TMs
> each. Then I reduced it to 4 nodes each with 2 TMs, which worked.
>
>
>
> On Fri, Jul 8, 2016 at 12:31 PM, Robert Metzger 
> wrote:
>
>> Hi,
>> from the TaskManager logs, I can not see anything suspicious.
>> Its a bit weird that the TaskManager logs just end, without any
>> shutdown messages. Usually the TMs log some shut down stuff when they are
>> stopping.
>> Also, if they would be still running, I would expect some error
>> messages from akka about the connection status.
>> So the only thing I conclude is that one of the TMs was killed by the
>> OS or the JVM crashed. Did you check if that happened?
>>
>> Do you have any service like puppet that is controlling processes?
>>
>>
>> On Thu, Jul 7, 2016 at 5:46 PM, Saliya Ekanayake 
>> wrote:
>>
>>> I see two logs (attached), but there's only 1 TaskManger process.
>>> Also, the Web console says it can find only 1 TM.
>>>
>>> However, I see this part in JM log, which shows there was a second
>>> TM at one point, but it was unregistered. Any thoughts?
>>>
>>> --
>>>
>>> - Registered TaskManager at j-002 (akka.tcp://
>>> flink@172.16.0.2:42888/user/taskmanager) as
>>> 1c65415701f19978c8a8cdc75c993717. Current number of registered hosts is 
>>> 1.
>>> Current number of alive task slots is 12.
>>>
>>> 2016-07-07 11:32:40,363 WARN  akka.remote.ReliableDeliverySupervisor
>>> - Association with remote system [akka.tcp://flink@172.16.0.2:42888]
>>> has failed, address is now gated for [5000] ms. Reason is: 
>>> [Disassociated].
>>>
>>> 2016-07-07 11:32:42,722 INFO
>>>  org.apache.flink.runtime.instance.InstanceManager - Registered 
>>> TaskManager
>>> at j-002 (akka.tcp://flink@172.16.0.2:37373/user/taskmanager) as
>>> 9c4ec66f5acbc19f7931fcae8345cd4e. Current number of registered hosts is 
>>> 2.
>>> Current number of alive task slots is 24.
>>>
>>> 2016-07-07 11:33:15,316 WARN  Remoting - Tried to associate with
>>> unreachable remote address [akka.tcp://flink@172.16.0.2:42888].
>>> Address is now gated for 5000 ms, all messages to this address will be
>>> delivered to dead letters. Reason: Connection refused: /
>>> 172.16.0.2:42888
>>>
>>> 2016-07-07 11:33:15,320 INFO
>>>  org.apache.flink.runtime.jobmanager.JobManager - Task manager 
>>> akka.tcp://
>>> flink@172.16.0.2:42888/user/taskmanager terminated.
>>> 2016-07-07 11:33:15,

[Discuss] Ordering of Records

2016-07-12 Thread vinay patil
Hi,

Here are some of the queries I have :

I have two different streams stream1 and stream2 in which the elements are
in order.

1) Now when I do keyBy on each of these streams, will the order be
maintained ? (Since every group here will be sent to one task manager only )
My understanding is that the records will be in order for a group, correct
me here 

2) After the keyBy on both of the streams I am doing co-group to get the
matching and non-matching records. Will the order be maintained here also ?,
since this also works on KeyedStream.
I am using EventTime, and AscendingTimestampExtractor for generating
timestamp and watermark.

3) Now I want to perform the sequence check on the
matching_nonMatchingStream I get from 2) using map/flatmap. 
Do I need to again perform the keyBy here , or if I keep in chain will the
matching_nonMatchingStream run in same TaskManager ?
My understanding here is that the chain will work here, correct me , getting
confused.

4) slotSharingGroup - can you please describe more about this
according to the doc : Sets the slot sharing group of this operation.
Parallel instances of operations that are in the same slot sharing group
will be co-located in the same TaskManager slot, if possible.


Regards,
Vinay Patil



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discuss-Ordering-of-Records-tp7933.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


HDFS to Kafka

2016-07-12 Thread Dominique Rondé

Hi folks,

on the first view I have a very simple problem. I like to get datasets 
out of some textfiles in HDFS and send them to a kafka topic. I use the 
following code to do that:


DataStream hdfsDatasource = env.readTextFile("hdfs://" + 
parameterTool.getRequired("hdfs_env") + "/user/flink/" + 
parameterTool.getRequired("hdfs_path") + "/");
hdfsDatasource.addSink(new 
FlinkKafkaProducer08(parameterTool.getRequired("brokerlist"),parameterTool.getRequired("topic"),new 
SimpleStringSchema()));


Everything works fine. But I need a possibility to go recursive through 
the source folder and find textfiles in subfolders. For my batch 
routines it work fine with "recursive.file.enumeration", but in the 
streaming environment it is not possible to give these configuration to 
the readTextFile method.


Can someone give me a hint ?

Cheers

Dominique



Re: sampling function

2016-07-12 Thread Till Rohrmann
Stratified sampling would also be beneficial for the DataSet API. I think
it would be best if this method is also added to DataSetUtils or made
available via the flink-contrib module. Furthermore, I think that it would
be easiest if you created the JIRA for this feature, because you know what
you want to add. For that you have to register at
https://issues.apache.org/jira, if you haven't done this, and then we can
add you as a contributor. Based on the JIRA description we can
discuss possible implementations then.

Cheers,
Till

On Tue, Jul 12, 2016 at 12:11 PM, Paris Carbone  wrote:

> Hey Do,
>
> I think that more sophisticated samplers could make a better fit in the ML
> library and not in the core API but I am not very familiar with the
> milestones there.
> Maybe the maintainers of the batch ML library could check if sampling
> techniques could be useful there I guess.
>
> Paris
>
> > On 11 Jul 2016, at 16:15, Le Quoc Do  wrote:
> >
> > Hi all,
> >
> > Thank you all for your answers.
> > By the way, I also recognized that Flink doesn't support  "stratified
> > sampling" function (only simple random sampling) for DataSet.
> > It would be nice if someone can create a Jira for it, and assign the task
> > to me so that I can work for it.
> >
> > Thank you,
> > Do
> >
> > On Mon, Jul 11, 2016 at 11:44 AM, Vasiliki Kalavri <
> > vasilikikala...@gmail.com> wrote:
> >
> >> Hi Do,
> >>
> >> Paris and Martha worked on sampling techniques for data streams on Flink
> >> last year. If you want to implement your own samplers, you might find
> >> Martha's master thesis helpful [1].
> >>
> >> -Vasia.
> >>
> >> [1]: http://kth.diva-portal.org/smash/get/diva2:910695/FULLTEXT01.pdf
> >>
> >> On 11 July 2016 at 11:31, Kostas Kloudas 
> >> wrote:
> >>
> >>> Hi Do,
> >>>
> >>> In DataStream you can always implement your own
> >>> sampling function, hopefully without too much effort.
> >>>
> >>> Adding such functionality it to the API could be a good idea.
> >>> But given that in sampling there is no “one-size-fits-all”
> >>> solution (as not every use case needs random sampling and not
> >>> all random samplers fit to all workloads), I am not sure if we
> >>> should start adding different sampling operators.
> >>>
> >>> Thanks,
> >>> Kostas
> >>>
>  On Jul 9, 2016, at 5:43 PM, Greg Hogan  wrote:
> 
>  Hi Do,
> 
>  DataSet provides a stable @Public interface. DataSetUtils is marked
>  @PublicEvolving which is intended for public use, has stable behavior,
> >>> but
>  method signatures may change. It's also good to limit DataSet to
> common
>  methods whereas the utility methods tend to be used for specific
>  applications.
> 
>  I don't have the pulse of streaming but this sounds like a useful
> >> feature
>  that could be added.
> 
>  Greg
> 
>  On Sat, Jul 9, 2016 at 10:47 AM, Le Quoc Do 
> >> wrote:
> 
> > Hi all,
> >
> > I'm working on approximate computing using sampling techniques. I
> > recognized that Flink supports the sample function for Dataset
> > (org/apache/flink/api/java/utils/DataSetUtils.java). I'm just
> >> wondering
> >>> why
> > you didn't merge the function to
> >> org/apache/flink/api/java/DataSet.java
> > since the sample function works as a transformation operator?
> >
> > The second question is that are you planning to support the sample
> > function for DataStream (within windows) since I did not see it in
> > DataStream code ?
> >
> > Thank you,
> > Do
> >
> >>>
> >>>
> >>
>
>


Re: Killing Yarn Session Leaves Lingering Flink Jobs

2016-07-12 Thread Stephan Ewen
I think there is a confusion between how Flink thinks about HA and job life
cycle, and how many users think about it.

Flink thinks that a killing of the YARN session is a failure of the job. So
as soon as new Yarn resources become available, it tries to recover the job.
Most users think that killing a Yarn session is equivalent to canceling the
job.

I am unsure if we should start to interpret the killing of a Yarn session
as a cancellation. Do Yarn sessions never get killed accidentally, or as
the result of a Yarn-related failure?

Using Flink-job-at-a-time-on-yarn, cancelling the Flink Job also shuts down
the Yarn session and hence shuts down everything properly.

Hope that train of thought helps.


On Tue, Jul 12, 2016 at 3:15 PM, Ufuk Celebi  wrote:

> Are you running in HA mode? If yes, that's the expected behaviour at
> the moment, because the ZooKeeper data is only cleaned up on a
> terminal state (FINISHED, FAILED, CANCELLED). You have to specify
> separate ZooKeeper root paths via "recovery.zookeeper.path.root".
> There is an issue which should be fixed for 1.2 to make this
> configurable in an easy way.
>
> On Tue, Jul 12, 2016 at 1:28 PM, Konstantin Gregor
>  wrote:
> > Hello everyone,
> >
> > I have a question concerning stopping Flink streaming processes that run
> > in a detached Yarn session.
> >
> > Here's what we do: We start a Yarn session via
> > yarn-session.sh -n 8 -d -jm 4096 -tm 1 -s 10 -qu flink_queue
> >
> > Then, we start our Flink streaming application via
> > flink run -p 65 -c SomeClass some.jar > /dev/null 2>&1  &
> >
> > The problem occurs when we stop the application.
> > If we stop the Flink application with
> > flink cancel 
> > and then kill the yarn application with
> > yarn application -kill 
> > everything is fine.
> > But what we expected was that when we only kill the yarn application
> > without specifically canceling the Flink job before, the Flink job will
> > stay lingering on the machine and use resources until it is killed
> > manually via its process id.
> >
> > One thing that we tried was to stop using ephemeral ports for the
> > application-manager, namely we set yarn.application-master.port
> > specifically to some port number, but the problem remains: Killing the
> > yarn application does not kill the corresponding Flink job.
> >
> > Does anyone have an idea about this? Any help is greatly appreciated :-)
> > By the way, our application reads data from a Kafka queue and writes it
> > into HDFS, maybe this is also important to know.
> >
> > Thank you and best regards
> >
> > Konstantin
> > --
> > Konstantin Gregor * konstantin.gre...@tngtech.com
> > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> > Sitz: Unterföhring * Amtsgericht München * HRB 135082
>


Re: Killing Yarn Session Leaves Lingering Flink Jobs

2016-07-12 Thread Ufuk Celebi
Are you running in HA mode? If yes, that's the expected behaviour at
the moment, because the ZooKeeper data is only cleaned up on a
terminal state (FINISHED, FAILED, CANCELLED). You have to specify
separate ZooKeeper root paths via "recovery.zookeeper.path.root".
There is an issue which should be fixed for 1.2 to make this
configurable in an easy way.

On Tue, Jul 12, 2016 at 1:28 PM, Konstantin Gregor
 wrote:
> Hello everyone,
>
> I have a question concerning stopping Flink streaming processes that run
> in a detached Yarn session.
>
> Here's what we do: We start a Yarn session via
> yarn-session.sh -n 8 -d -jm 4096 -tm 1 -s 10 -qu flink_queue
>
> Then, we start our Flink streaming application via
> flink run -p 65 -c SomeClass some.jar > /dev/null 2>&1  &
>
> The problem occurs when we stop the application.
> If we stop the Flink application with
> flink cancel 
> and then kill the yarn application with
> yarn application -kill 
> everything is fine.
> But what we expected was that when we only kill the yarn application
> without specifically canceling the Flink job before, the Flink job will
> stay lingering on the machine and use resources until it is killed
> manually via its process id.
>
> One thing that we tried was to stop using ephemeral ports for the
> application-manager, namely we set yarn.application-master.port
> specifically to some port number, but the problem remains: Killing the
> yarn application does not kill the corresponding Flink job.
>
> Does anyone have an idea about this? Any help is greatly appreciated :-)
> By the way, our application reads data from a Kafka queue and writes it
> into HDFS, maybe this is also important to know.
>
> Thank you and best regards
>
> Konstantin
> --
> Konstantin Gregor * konstantin.gre...@tngtech.com
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082


RE: error for building flink-runtime from source

2016-07-12 Thread Radu Tudoran
Hi,

I am building the 1.1 snapshot (should be the latest release).
I will try to build the whole project to check if it works

Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R&D Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

From: Márton Balassi [mailto:balassi.mar...@gmail.com]
Sent: Tuesday, July 12, 2016 2:37 PM
To: user@flink.apache.org
Subject: Re: error for building flink-runtime from source

Hi Radu,

Which version of Flink are you building?

Looking at the current master builds they are coming in green recently [1]. If 
you are solely building flink-runtime the issue might be that you are using 
different version of flink-core (a dependency of flink-runtime) and 
flink-runtime.

Could you try building Flink as a whole?

[1] https://travis-ci.org/apache/flink/builds/144096299

Best,

Marton

On Tue, Jul 12, 2016 at 12:02 PM, Radu Tudoran 
mailto:radu.tudo...@huawei.com>> wrote:
I am trying to build flink-runtime from source.

I run mvn install and the compilation builds with the error below.
Any though on this?

[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on 
project flink-runtime_2.10: Compilation failure
[ERROR] /D:/ISAR 
2/Dependencies/flink/flink-master/flink-master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java:[60,49]
 cannot find symbol
[ERROR] symbol:   method getDelayBetweenAttempts()
[ERROR] location: variable fixedDelayConfig of type 
org.apache.flink.api.common.restartstrategy.RestartStrategies.FixedDelayRestartStrategyConfiguration
[ERROR] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
switch.


Best regards,

Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R&D Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!




Re: error for building flink-runtime from source

2016-07-12 Thread Márton Balassi
Hi Radu,

Which version of Flink are you building?

Looking at the current master builds they are coming in green recently [1].
If you are solely building flink-runtime the issue might be that you are
using different version of flink-core (a dependency of flink-runtime) and
flink-runtime.

Could you try building Flink as a whole?

[1] https://travis-ci.org/apache/flink/builds/144096299

Best,

Marton

On Tue, Jul 12, 2016 at 12:02 PM, Radu Tudoran 
wrote:

> I am trying to build flink-runtime from source.
>
>
>
> I run mvn install and the compilation builds with the error below.
>
> Any though on this?
>
>
>
> [ERROR] Failed to execute goal
> org.apache.maven.plugins:maven-compiler-plugin:3.1:compile
> (default-compile) on project flink-runtime_2.10: Compilation failure
>
> [ERROR] /D:/ISAR
> 2/Dependencies/flink/flink-master/flink-master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java:[60,49]
> cannot find symbol
>
> [ERROR] symbol:   method getDelayBetweenAttempts()
>
> [ERROR] location: variable fixedDelayConfig of type
> org.apache.flink.api.common.restartstrategy.RestartStrategies.FixedDelayRestartStrategyConfiguration
>
> [ERROR] -> [Help 1]
>
> [ERROR]
>
> [ERROR] To see the full stack trace of the errors, re-run Maven with the
> -e switch.
>
>
>
>
>
> Best regards,
>
>
>
> Dr. Radu Tudoran
>
> Research Engineer - Big Data Expert
>
> IT R&D Division
>
>
>
> [image: cid:image007.jpg@01CD52EB.AD060EE0]
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>
> European Research Center
>
> Riesstrasse 25, 80992 München
>
>
>
> E-mail: *radu.tudo...@huawei.com *
>
> Mobile: +49 15209084330
>
> Telephone: +49 891588344173
>
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
>
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
>
>
>


Re: Parameters to Control Intra-node Parallelism

2016-07-12 Thread Ovidiu-Cristian MARCU
Hi,

Can you post your configuration parameters (exclude default settings) and 
cluster description?

Best,
Ovidiu
> On 11 Jul 2016, at 17:49, Saliya Ekanayake  wrote:
> 
> Thank you Greg, I'll check if this was the cause for my TMs to disappear.
> 
> On Mon, Jul 11, 2016 at 11:34 AM, Greg Hogan  > wrote:
> The OOM killer doesn't give warning so you'll need to call dmesg or look in 
> /var/log/messages or similar. The following reports that Debian flavors may 
> use /var/log/syslog.
>   
> http://stackoverflow.com/questions/624857/finding-which-process-was-killed-by-linux-oom-killer
>  
> 
> 
> On Sun, Jul 10, 2016 at 11:55 PM, Saliya Ekanayake  > wrote:
> Greg,
> 
> where did you see the OOM log as shown in this mail thread? In my case none 
> of the TaskManagers nor JobManger reports an error like this.
> 
> On Sun, Jul 10, 2016 at 8:45 PM, Greg Hogan  > wrote:
> These symptoms sounds similar to what I was experiencing in the following 
> thread. Flink can have some unexpected memory usage which can result in an 
> OOM kill by the kernel, and this becomes more pronounced as the cluster size 
> grows.
>   https://www.mail-archive.com/dev@flink.apache.org/msg06346.html 
> 
> 
> On Fri, Jul 8, 2016 at 12:46 PM, Saliya Ekanayake  > wrote:
> I checked, but JVMs didn't crash. No puppet or other services like that.
> 
> One thing I found is that things work OK when I have a smaller number of 
> slaves. For example, here I was trying to run on 16 nodes giving 2 TMs each. 
> Then I reduced it to 4 nodes each with 2 TMs, which worked.
> 
> 
> 
> On Fri, Jul 8, 2016 at 12:31 PM, Robert Metzger  > wrote:
> Hi,
> from the TaskManager logs, I can not see anything suspicious.
> Its a bit weird that the TaskManager logs just end, without any shutdown 
> messages. Usually the TMs log some shut down stuff when they are stopping.
> Also, if they would be still running, I would expect some error messages from 
> akka about the connection status.
> So the only thing I conclude is that one of the TMs was killed by the OS or 
> the JVM crashed. Did you check if that happened?
> 
> Do you have any service like puppet that is controlling processes?
> 
> 
> On Thu, Jul 7, 2016 at 5:46 PM, Saliya Ekanayake  > wrote:
> I see two logs (attached), but there's only 1 TaskManger process. Also, the 
> Web console says it can find only 1 TM. 
> 
> However, I see this part in JM log, which shows there was a second TM at one 
> point, but it was unregistered. Any thoughts?
> 
> --
> 
> - Registered TaskManager at j-002 
> (akka.tcp://flink@172.16.0.2:42888/user/taskmanager 
> ) as 
> 1c65415701f19978c8a8cdc75c993717. Current number of registered hosts is 1. 
> Current number of alive task slots is 12.
> 
> 2016-07-07 11:32:40,363 WARN  akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@172.16.0.2:42888 
> ] has failed, address is now gated for [5000] 
> ms. Reason is: [Disassociated].
> 
> 2016-07-07 11:32:42,722 INFO  
> org.apache.flink.runtime.instance.InstanceManager - Registered TaskManager at 
> j-002 (akka.tcp://flink@172.16.0.2:37373/user/taskmanager 
> ) as 
> 9c4ec66f5acbc19f7931fcae8345cd4e. Current number of registered hosts is 2. 
> Current number of alive task slots is 24.
> 
> 2016-07-07 11:33:15,316 WARN  Remoting - Tried to associate with unreachable 
> remote address [akka.tcp://flink@172.16.0.2:42888 
> ]. Address is now gated for 5000 ms, all 
> messages to this address will be delivered to dead letters. Reason: 
> Connection refused: /172.16.0.2:42888 
> 
> 2016-07-07 11:33:15,320 INFO  org.apache.flink.runtime.jobmanager.JobManager 
> - Task manager akka.tcp://flink@172.16.0.2:42888/user/taskmanager 
>  terminated.
> 2016-07-07 11:33:15,320 INFO  
> org.apache.flink.runtime.instance.InstanceManager - Unregistered task manager 
> akka.tcp://flink@172.16.0.2:42888/user/taskmanager 
> . Number of registered task 
> managers 1. Number of available slots 12.
> 
> 
> On Thu, Jul 7, 2016 at 4:27 AM, Ufuk Celebi  > wrote:
> No that should suffice. Can you check whether there are any task
> manager logs for the second TM on that machine
> (taskmanager-X-j-011.log where X is the TM number)? If yes, the task
> manager process does start up and there is another problem. If not,
> the task managers seems not to start even.
> 
> – Ufuk
> 
> On Thu, Jul

Killing Yarn Session Leaves Lingering Flink Jobs

2016-07-12 Thread Konstantin Gregor
Hello everyone,

I have a question concerning stopping Flink streaming processes that run
in a detached Yarn session.

Here's what we do: We start a Yarn session via
yarn-session.sh -n 8 -d -jm 4096 -tm 1 -s 10 -qu flink_queue

Then, we start our Flink streaming application via
flink run -p 65 -c SomeClass some.jar > /dev/null 2>&1  &

The problem occurs when we stop the application.
If we stop the Flink application with
flink cancel 
and then kill the yarn application with
yarn application -kill 
everything is fine.
But what we expected was that when we only kill the yarn application
without specifically canceling the Flink job before, the Flink job will
stay lingering on the machine and use resources until it is killed
manually via its process id.

One thing that we tried was to stop using ephemeral ports for the
application-manager, namely we set yarn.application-master.port
specifically to some port number, but the problem remains: Killing the
yarn application does not kill the corresponding Flink job.

Does anyone have an idea about this? Any help is greatly appreciated :-)
By the way, our application reads data from a Kafka queue and writes it
into HDFS, maybe this is also important to know.

Thank you and best regards

Konstantin
-- 
Konstantin Gregor * konstantin.gre...@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Re: sampling function

2016-07-12 Thread Paris Carbone
Hey Do,

I think that more sophisticated samplers could make a better fit in the ML 
library and not in the core API but I am not very familiar with the milestones 
there.
Maybe the maintainers of the batch ML library could check if sampling 
techniques could be useful there I guess.

Paris

> On 11 Jul 2016, at 16:15, Le Quoc Do  wrote:
> 
> Hi all,
> 
> Thank you all for your answers.
> By the way, I also recognized that Flink doesn't support  "stratified
> sampling" function (only simple random sampling) for DataSet.
> It would be nice if someone can create a Jira for it, and assign the task
> to me so that I can work for it.
> 
> Thank you,
> Do
> 
> On Mon, Jul 11, 2016 at 11:44 AM, Vasiliki Kalavri <
> vasilikikala...@gmail.com> wrote:
> 
>> Hi Do,
>> 
>> Paris and Martha worked on sampling techniques for data streams on Flink
>> last year. If you want to implement your own samplers, you might find
>> Martha's master thesis helpful [1].
>> 
>> -Vasia.
>> 
>> [1]: http://kth.diva-portal.org/smash/get/diva2:910695/FULLTEXT01.pdf
>> 
>> On 11 July 2016 at 11:31, Kostas Kloudas 
>> wrote:
>> 
>>> Hi Do,
>>> 
>>> In DataStream you can always implement your own
>>> sampling function, hopefully without too much effort.
>>> 
>>> Adding such functionality it to the API could be a good idea.
>>> But given that in sampling there is no “one-size-fits-all”
>>> solution (as not every use case needs random sampling and not
>>> all random samplers fit to all workloads), I am not sure if we
>>> should start adding different sampling operators.
>>> 
>>> Thanks,
>>> Kostas
>>> 
 On Jul 9, 2016, at 5:43 PM, Greg Hogan  wrote:
 
 Hi Do,
 
 DataSet provides a stable @Public interface. DataSetUtils is marked
 @PublicEvolving which is intended for public use, has stable behavior,
>>> but
 method signatures may change. It's also good to limit DataSet to common
 methods whereas the utility methods tend to be used for specific
 applications.
 
 I don't have the pulse of streaming but this sounds like a useful
>> feature
 that could be added.
 
 Greg
 
 On Sat, Jul 9, 2016 at 10:47 AM, Le Quoc Do 
>> wrote:
 
> Hi all,
> 
> I'm working on approximate computing using sampling techniques. I
> recognized that Flink supports the sample function for Dataset
> (org/apache/flink/api/java/utils/DataSetUtils.java). I'm just
>> wondering
>>> why
> you didn't merge the function to
>> org/apache/flink/api/java/DataSet.java
> since the sample function works as a transformation operator?
> 
> The second question is that are you planning to support the sample
> function for DataStream (within windows) since I did not see it in
> DataStream code ?
> 
> Thank you,
> Do
> 
>>> 
>>> 
>> 



error for building flink-runtime from source

2016-07-12 Thread Radu Tudoran
I am trying to build flink-runtime from source.

I run mvn install and the compilation builds with the error below.
Any though on this?

[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on 
project flink-runtime_2.10: Compilation failure
[ERROR] /D:/ISAR 
2/Dependencies/flink/flink-master/flink-master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java:[60,49]
 cannot find symbol
[ERROR] symbol:   method getDelayBetweenAttempts()
[ERROR] location: variable fixedDelayConfig of type 
org.apache.flink.api.common.restartstrategy.RestartStrategies.FixedDelayRestartStrategyConfiguration
[ERROR] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
switch.


Best regards,

Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R&D Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!



Re: Create window before the first event

2016-07-12 Thread Kostas Kloudas
Hi Xiang, 

I think this is a duplicate from the discussion you opened yesterday.
I post the same answer here, in case somebody wants to contribute to 
the discussion. 

According to your code, you just put all your elements (no splitting by key) 
into a single infinite window,
and you apply your window function every 5min (after the first element had 
arrived).

The combination of the two means that if you have elements arriving at steady 
pace
of 1 element/min, and your function just counts the already seen elements. then 
the 
result will be 5, 10, 15 …

Under the hood, when the first element arrives the trigger registers the first 
time to fire after 5 min.
Then, for every firing, the trigger registers another timer to fire after 5min, 
and so on.

Your problem is that the first timer is set on the first element.
If you control your source, why don’t you put a dummy element in the beginning?
This will instantiate the global window and set the first timer.

Kostas

> On Jul 12, 2016, at 4:21 AM, Xiang Zhang  wrote:
> 
> Hi,
> 
> I am trying to have a trigger fires every 5 mins, even when sometimes no
> event comes (just output default for empty window). The closest solution I
> got to work is this:
> 
>  datastream.windowAll(GlobalWindows.create())
> 
> .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(5)))
>.apply { MY_APPLY_FUNCTION}
> 
> For master branch, this works after the first event appears, then it
> continuously fires every 5 mins even when there is no events (not work for
> release 1.0.3 though due to the change in ContinuousProcessingTimeTrigger).
> 
> However, sometimes my first event may not appear in the first 5 mins. I
> still need to fire a default value first, but it seems the GlobalWindow is
> only created after seen an event, so it only works after the first event. Is
> there any way I can create the window before the first event comes in
> windowAssigner?
> 
> Thanks,
> Xiang
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Create-window-before-the-first-event-tp7920.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: ContinuousProcessingTimeTrigger on empty

2016-07-12 Thread Kostas Kloudas
Hi Xiang,

According to your code, you just put all your elements (no splitting by key) 
into a single infinite window,
and you apply your window function every 5min (after the first element had 
arrived).

The combination of the two means that if you have elements arriving at steady 
pace
of 1 element/min, and your function just counts the already seen elements. then 
the 
result will be 5, 10, 15 …

Under the hood, when the first element arrives the trigger registers the first 
time to fire after 5 min.
Then, for every firing, the trigger registers another timer to fire after 5min, 
and so on.

Your problem is that the first timer is set on the first element.
If you control your source, why don’t you put a dummy element in the beginning?
This will instantiate the global window and set the first timer.

Kostas

> On Jul 11, 2016, at 8:39 PM, Xiang Zhang  wrote:
> 
> Hi Kostas,
> 
> Yes, so I tried GlobalWindows. Is it possible to trigger every 5 mins on
> GlobalWindows? From the comments in the source for
> ContinuousProcessingTimeTrigger, it says:
> * A {@link Trigger} that continuously fires based on a given time interval
> as measured by
> * the clock of the machine on which the job is running.
> 
> So what does ContinuousProcessingTimeTrigger do in GlobalWindows?
> 
> Xiang
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ContinuousProcessingTimeTrigger-on-empty-tp7914p7917.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.