Re: Task managers run on separate nodes in a cluster

2018-09-17 Thread Martin Eden
Thanks for the feedback Liu and Till.
@Liu Yeah that would work but unfortunately we run other services on the
cluster so it's not really an option.
@Till Will have a look and see how much time I can dedicate to this.
M

On Mon, Sep 17, 2018 at 7:21 AM Till Rohrmann  wrote:

> Hi Martin,
>
> I'm not aware that the community is actively working on enabling the
> balanced host attribute constraint. If you wanna give it a try, then I'm
> happy to review your contribution.
>
> Cheers,
> Till
>
> On Mon, Sep 17, 2018 at 5:28 AM Renjie Liu 
> wrote:
>
>> Hi, Martin:
>> I think a better solution would be to set the number of cores of each
>> container equals to that of a physical server if this mesos cluster is
>> dedicated to your flink cluster.
>>
>> On Mon, Sep 17, 2018 at 5:28 AM Martin Eden 
>> wrote:
>>
>>> Hi Till,
>>>
>>> I was able to use mesos.constraints.hard.hostattribute to run all task
>>> managers on a particular host in my cluster.
>>>
>>> However, after looking a bit at the code, I'm not sure we can use
>>> mesos.constraints.hard.hostattribute for load balancing Flink task managers
>>> evenly across hosts in a Mesos cluster.
>>>
>>> This is because under the hood it uses the fenzo host attribute value
>>> constraint while we would need the fenzo balanced host attribute constraint.
>>>
>>> The LaunchableMesosWorker sets the constraints via
>>> the com.netflix.fenzo.TaskRequest and all of these hard constraints must be
>>> satisfied by a host for the task scheduler to assign this task to that
>>> host. Since the current implementation always return the static constraint
>>> value configured i.e. what is after ":", see
>>> org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters#addHostAttrValueConstraint,
>>> I don't see how we can use it to load balance unless the constraint value
>>> would be dynamic based on the some property of the mesos task request.
>>>
>>> Am I correct in my assumptions?
>>>
>>> Any other way of load balancing?
>>> Maybe by not even using the DCOS Flink package (mesos flink framework)
>>> at all?
>>> Any plans to add support for the fenzo balanced host attribute
>>> constraint?
>>>
>>> Thanks,
>>> M
>>>
>>>
>>>
>>>
>>> On Fri, Sep 14, 2018 at 5:46 PM Till Rohrmann 
>>> wrote:
>>>
>>>> Hi Martin,
>>>>
>>>> Flink supports the mesos.constraints.hard.hostattribute to specify task
>>>> constraints based on agent attributes [1]. I think you could use them to
>>>> control the task placement.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#mesos-constraints-hard-hostattribute
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Fri, Sep 14, 2018 at 3:08 PM Martin Eden 
>>>> wrote:
>>>>
>>>>> Thanks Vino!
>>>>>
>>>>> On Fri, Sep 14, 2018 at 3:37 AM vino yang 
>>>>> wrote:
>>>>>
>>>>>> Hi Martin,
>>>>>>
>>>>>> Till has done most of the work of Flink on Mesos. Ping Till for you.
>>>>>>
>>>>>> Thanks, vino.
>>>>>>
>>>>>> Martin Eden  于2018年9月12日周三 下午11:21写道:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> We're using Flink 1.3.2 with DCOS / Mesos.
>>>>>>>
>>>>>>> We have a 3 node cluster and are running the Flink DCOS package
>>>>>>> (Flink Mesos framework) configured with 3 Task Managers.
>>>>>>>
>>>>>>> Our goal is to run each of them on separate hosts for better load
>>>>>>> balancing but it seems the task managers end up running on the same 
>>>>>>> host.
>>>>>>>
>>>>>>> Looked around the docs and DCOS Flink package but could not find any
>>>>>>> placement policy or anything of the sorts.
>>>>>>>
>>>>>>> Is there anything like that?
>>>>>>>
>>>>>>> We are also planning to upgrade to the latest Flink version. Is
>>>>>>> something like that supported in this newer version?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> M
>>>>>>>
>>>>>> --
>> Liu, Renjie
>> Software Engineer, MVAD
>>
>


Re: Task managers run on separate nodes in a cluster

2018-09-16 Thread Martin Eden
Hi Till,

I was able to use mesos.constraints.hard.hostattribute to run all task
managers on a particular host in my cluster.

However, after looking a bit at the code, I'm not sure we can use
mesos.constraints.hard.hostattribute for load balancing Flink task managers
evenly across hosts in a Mesos cluster.

This is because under the hood it uses the fenzo host attribute value
constraint while we would need the fenzo balanced host attribute constraint.

The LaunchableMesosWorker sets the constraints via
the com.netflix.fenzo.TaskRequest and all of these hard constraints must be
satisfied by a host for the task scheduler to assign this task to that
host. Since the current implementation always return the static constraint
value configured i.e. what is after ":", see
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters#addHostAttrValueConstraint,
I don't see how we can use it to load balance unless the constraint value
would be dynamic based on the some property of the mesos task request.

Am I correct in my assumptions?

Any other way of load balancing?
Maybe by not even using the DCOS Flink package (mesos flink framework) at
all?
Any plans to add support for the fenzo balanced host attribute constraint?

Thanks,
M




On Fri, Sep 14, 2018 at 5:46 PM Till Rohrmann  wrote:

> Hi Martin,
>
> Flink supports the mesos.constraints.hard.hostattribute to specify task
> constraints based on agent attributes [1]. I think you could use them to
> control the task placement.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#mesos-constraints-hard-hostattribute
>
> Cheers,
> Till
>
> On Fri, Sep 14, 2018 at 3:08 PM Martin Eden 
> wrote:
>
>> Thanks Vino!
>>
>> On Fri, Sep 14, 2018 at 3:37 AM vino yang  wrote:
>>
>>> Hi Martin,
>>>
>>> Till has done most of the work of Flink on Mesos. Ping Till for you.
>>>
>>> Thanks, vino.
>>>
>>> Martin Eden  于2018年9月12日周三 下午11:21写道:
>>>
>>>> Hi all,
>>>>
>>>> We're using Flink 1.3.2 with DCOS / Mesos.
>>>>
>>>> We have a 3 node cluster and are running the Flink DCOS package (Flink
>>>> Mesos framework) configured with 3 Task Managers.
>>>>
>>>> Our goal is to run each of them on separate hosts for better load
>>>> balancing but it seems the task managers end up running on the same host.
>>>>
>>>> Looked around the docs and DCOS Flink package but could not find any
>>>> placement policy or anything of the sorts.
>>>>
>>>> Is there anything like that?
>>>>
>>>> We are also planning to upgrade to the latest Flink version. Is
>>>> something like that supported in this newer version?
>>>>
>>>> Thanks,
>>>> M
>>>>
>>>


Re: Task managers run on separate nodes in a cluster

2018-09-14 Thread Martin Eden
Thanks Vino!

On Fri, Sep 14, 2018 at 3:37 AM vino yang  wrote:

> Hi Martin,
>
> Till has done most of the work of Flink on Mesos. Ping Till for you.
>
> Thanks, vino.
>
> Martin Eden  于2018年9月12日周三 下午11:21写道:
>
>> Hi all,
>>
>> We're using Flink 1.3.2 with DCOS / Mesos.
>>
>> We have a 3 node cluster and are running the Flink DCOS package (Flink
>> Mesos framework) configured with 3 Task Managers.
>>
>> Our goal is to run each of them on separate hosts for better load
>> balancing but it seems the task managers end up running on the same host.
>>
>> Looked around the docs and DCOS Flink package but could not find any
>> placement policy or anything of the sorts.
>>
>> Is there anything like that?
>>
>> We are also planning to upgrade to the latest Flink version. Is something
>> like that supported in this newer version?
>>
>> Thanks,
>> M
>>
>


Task managers run on separate nodes in a cluster

2018-09-12 Thread Martin Eden
Hi all,

We're using Flink 1.3.2 with DCOS / Mesos.

We have a 3 node cluster and are running the Flink DCOS package (Flink
Mesos framework) configured with 3 Task Managers.

Our goal is to run each of them on separate hosts for better load balancing
but it seems the task managers end up running on the same host.

Looked around the docs and DCOS Flink package but could not find any
placement policy or anything of the sorts.

Is there anything like that?

We are also planning to upgrade to the latest Flink version. Is something
like that supported in this newer version?

Thanks,
M


Re: 答复: Best way to find the current alive jobmanager with HA mode zookeeper

2018-08-21 Thread Martin Eden
Hi guys,

Just to close the loop, with the Flink 1.3.2 cli you have to provide the
Flink Job Manager host address in order to submit a job like so:
${FLINK_HOME}/bin/flink run -d -m ${FLINK_JOBMANAGER_ADDRESS} ${JOB_JAR}

Since we are running the DCOS Flink package we use the Marathon rest api to
fetch the FLINK_JOBMANAGER_ADDRESS which solved our problem.

We are now thinking of upgrading to the latest 1.6 release. From looking at
the cli docs and from the previous messages it seems you still need to
provide the Job Manager address explicitly. Are there any plans to support
job submission that just takes a zookeeper ensemble and zookeeperNamespace
(which is currently accepted) without having to provide explicit Job
Manager address? This would be more user friendly and would eliminate the
extra step of figuring out the Job Manager address.

Thanks,
M



On Tue, Jul 31, 2018 at 3:54 PM, Till Rohrmann  wrote:

> I think that the web ui automatically redirects to the current leader. So
> if you should access the JobManager which is not leader, then you should
> get an HTTP redirect to the current leader. Due to that it should not be
> strictly necessary to know which of the JobManagers is the leader.
>
> The RestClusterClient uses the ZooKeeperLeaderRetrievalService to
> retrieve the leader address. You could try the same. Using the
> RestClusterClient with Flink 1.4 won't work, though. Alternatively, you
> should be able to directly read the address from the leader ZNode in
> ZooKeeper.
>
> Cheers,
> Till
>
>
>
> On Thu, Jul 26, 2018 at 4:14 AM vino yang  wrote:
>
>> Hi Youjun,
>>
>> Thanks, you can try this but I am not sure if it works correctly. Because
>> for the REST Client, there are quite a few changes from 1.4 to 1.5.
>>
>> Maybe you can customize the source code in 1.4 refer to specific
>> implementation of 1.5? Another option, upgrade your Flink version.
>>
>> To Chesnay and Till:  any suggestion or opinion?
>>
>> Thanks, vino.
>>
>> 2018-07-26 10:01 GMT+08:00 Yuan,Youjun :
>>
>>> Thanks for the information. Forgot to mention, I am using Flink 1.4, the
>>> RestClusterClient seems don’t have the ability to retrieve the leader
>>> address. I did notice there is webMonitorRetrievalService member in Flink
>>> 1.5.
>>>
>>>
>>>
>>> I wonder if I can use RestClusterClient@v1.5 on my client side, to
>>> retrieve the leader JM of Flink v1.4 Cluster.
>>>
>>>
>>>
>>> Thanks
>>>
>>> Youjun
>>>
>>>
>>>
>>> *发件人**:* vino yang 
>>> *发送时间:* Wednesday, July 25, 2018 7:11 PM
>>> *收件人:* Martin Eden 
>>> *抄送:* Yuan,Youjun ; user@flink.apache.org
>>> *主题:* Re: Best way to find the current alive jobmanager with HA mode
>>> zookeeper
>>>
>>>
>>>
>>> Hi Martin,
>>>
>>>
>>>
>>>
>>>
>>> For a standalone cluster which exists multiple JM instances, If you do
>>> not use Rest API, but use Flink provided Cluster client. The client can
>>> perceive which one this the JM leader from multiple JM instances.
>>>
>>>
>>>
>>> For example, you can use CLI to submit flink job in a non-Leader node.
>>>
>>>
>>>
>>> But I did not verify this case for Flink on Mesos.
>>>
>>>
>>>
>>> Thanks, vino.
>>>
>>>
>>>
>>> 2018-07-25 17:22 GMT+08:00 Martin Eden :
>>>
>>> Hi,
>>>
>>>
>>>
>>> This is actually very relevant to us as well.
>>>
>>>
>>>
>>> We want to deploy Flink 1.3.2 on a 3 node DCOS cluster. In the case of
>>> Mesos/DCOS, Flink HA runs only one JobManager which gets restarted on
>>> another node by Marathon in case of failure and re-load it's state from
>>> Zookeeper.
>>>
>>>
>>>
>>> Yuan I am guessing you are using Flink in standalone mode and there it
>>> is actually running 3 instances of the Job Manager, 1 active and 2
>>> stand-bys.
>>>
>>>
>>>
>>> Either way, in both cases there is the need to "discover" the hostname
>>> and port of the Job Manager at runtime. This is needed when you want to use
>>> the cli to submit jobs for instance. Is there an elegant mode to submit
>>> jobs other than say just trying out all the possible nodes in your cluster?
>>>
>>>
>>>
>>> Grateful if anyone could clarify any of the above, thanks,
>>>
>>> M
>>>
>>>
>>>
>>> On Wed, Jul 25, 2018 at 11:37 AM, Yuan,Youjun 
>>> wrote:
>>>
>>> Hi all,
>>>
>>>
>>>
>>> I have a standalone cluster with 3 jobmanagers, and set *high-availability
>>> to zookeeper*. Our client submits job by REST API(POST
>>> /jars/:jarid/run), which means we need to know the host of the any of the
>>> current alive jobmanagers. The problem is that, how can we know which job
>>> manager is alive, or the host of current leader?  We don’t want to access a
>>> dead JM.
>>>
>>>
>>>
>>> Thanks.
>>>
>>> Youjun Yuan
>>>
>>>
>>>
>>>
>>>
>>
>>


Re: Best way to find the current alive jobmanager with HA mode zookeeper

2018-07-25 Thread Martin Eden
Hi,

This is actually very relevant to us as well.

We want to deploy Flink 1.3.2 on a 3 node DCOS cluster. In the case of
Mesos/DCOS, Flink HA runs only one JobManager which gets restarted on
another node by Marathon in case of failure and re-load it's state from
Zookeeper.

Yuan I am guessing you are using Flink in standalone mode and there it is
actually running 3 instances of the Job Manager, 1 active and 2 stand-bys.

Either way, in both cases there is the need to "discover" the hostname and
port of the Job Manager at runtime. This is needed when you want to use the
cli to submit jobs for instance. Is there an elegant mode to submit jobs
other than say just trying out all the possible nodes in your cluster?

Grateful if anyone could clarify any of the above, thanks,
M

On Wed, Jul 25, 2018 at 11:37 AM, Yuan,Youjun  wrote:

> Hi all,
>
>
>
> I have a standalone cluster with 3 jobmanagers, and set *high-availability
> to zookeeper*. Our client submits job by REST API(POST /jars/:jarid/run),
> which means we need to know the host of the any of the current alive
> jobmanagers. The problem is that, how can we know which job manager is
> alive, or the host of current leader?  We don’t want to access a dead JM.
>
>
>
> Thanks.
>
> Youjun Yuan
>


Re: Classloader and removal of native libraries

2018-05-03 Thread Martin Eden
Hi,

I'm reviving this thread because I am probably hitting a similar issue with
loading a native library. However I am not able to resolve it with the
suggestions here.

I am using Flink 1.3.2 and the Jep library to call Cpython from a
RichFlatMapFunction with a parallelism of 10. I am instantiating the Jep
class in the open method. The Jep class has a static block where it loads
the native library:
static {
  System.loadLibrary("jep");
}

I am running Flink in standalone mode in a high availability setup with
zookeeper.

Initially I am setting the java.library.path on the task manager jvm. The
code works, however, when I am testing JobManager recovery by killing and
restarting the JobManager, when the job gets redeployed it starts endlessly
failing with:

java.lang.UnsatisfiedLinkError: Native Library
/Library/Python/2.7/site-packages/jep/libjep.jnilib already loaded in
another classloader

Removing the java.library.path from the task manager jvm, and setting it
programatically with a randomized path, each time the RichFlatMapFunction class
is initialised (i.e. each time the job is redeployed), causes the task
manager jvm to fail upon JobManager recovery.

Conrad, can you give more details about exactly what paths you set and how?

Any suggestions from anyone would be much appreciated?

Thanks,
M




On Fri, Aug 11, 2017 at 2:46 PM, Conrad Crampton <
conrad.cramp...@secdata.com> wrote:

> Hi Aljoscha,
>
> “Hope that helps”…
>
> ABSOLUTELY!!!
>
>
>
> I have dug through the javacpp source code to find how the Loader class
> uses the temp cache location for the native libraries and in my open method
> in my RichMapFunction I am now setting the System.property to a random
> location so if the job restarts and calls open again, it uses a random temp
> location as in your example.
>
>
>
> So thank you so much, got me on the right path. Now I’ve got a problem
> with out of memory errors arrghh (I will start another thread on this as I
> don’t want to soil this one with a different topic)
>
>
>
> Thanks again
>
> Conrad
>
>
>
> *From: *Aljoscha Krettek 
> *Date: *Thursday, 10 August 2017 at 15:57
> *To: *Conrad Crampton 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Classloader and removal of native libraries
>
>
>
> Hi Conrad,
>
>
>
> I'm afraid you're running in the same problem that we already encountered
> with loading the native RocksDB library: https://github.com/
> apache/flink/blob/219ae33d36e67e3e74f493cf4956a290bc966a5d/flink-contrib/
> flink-statebackend-rocksdb/src/main/java/org/apache/
> flink/contrib/streaming/state/RocksDBStateBackend.java#L519
>
>
>
> The code section has a good description of what is going on. We're using
> NativeLibraryLoader [1], which comes with RocksDB  to try and load the
> native library from a different temporary location if loading it the normal
> way fails. (The relocation of the lib to a temp dir is in the
> NativeLibraryLoader, not on our side. We're just providing a temp path for
> NativeLibraryLoader to work with.)
>
>
>
> Hope that helps,
>
> Aljoscha
>
>
>
> [1] https://github.com/facebook/rocksdb/blob/master/
> java/src/main/java/org/rocksdb/NativeLibraryLoader.java
>
>
>
> On 10. Aug 2017, at 15:36, Conrad Crampton 
> wrote:
>
>
>
> Hi,
>
> First time posting here so ‘hi’.
>
> I have been using Flink (1.31 now) for a couple of months now and loving
> it. My deployment is to JobManager running as a long running session on
> Yarn.
>
> I have a problem where I have a Flink streaming job that involves loading
> native libraries as part of one of the mappers (inheriting from
> RichMapFunction) loading (in the open method) a previously trained machine
> learning model (using Deeplearning4j). The problem lies with when loading
> the model, it also loads some native libraries using javacpp Loader class
> (which from looking at the source code determines a location for native
> libraries and from a hierarchy of availability of a System property, the
> users home dir (with .javacpp) or temp dir).
>
> Anyway the actual problem lies is if an exception is thrown in the Flink
> job, the jobmanager tries to restart it, however it would appear that when
> it failed in the first place, references to the objects and therefore the
> classes aren’t released by the classloader as I get an error
>
>
>
> java.lang.ExceptionInInitializerError
>
> at org.nd4j.linalg.cpu.nativecpu.ops.NativeOpExecutioner.
> (NativeOpExecutioner.java:43)
>
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(
> NativeConstructorAccessorImpl.java:62)
>
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> DelegatingConstructorAccessorImpl.java:45)
>
> at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>
> at 

Re: Flink on K8s job submission best practices

2017-12-22 Thread Martin Eden
The above applies to Mesos/DCOS as well. So if someone would also share
insights into automatic job deployment in that setup would very useful.
Thanks.
M

On Fri, Dec 22, 2017 at 6:56 PM, Maximilian Bode <
maximilian.b...@tngtech.com> wrote:

> Hi everyone,
>
> We are beginning to run Flink on K8s and found the basic templates [1] as
> well as the example Helm chart [2] very helpful. Also the discussion about
> JobManager HA [3] and Patrick's talk [4] was very interesting. All in all
> it is delightful how easy everything can be set up and works out of the box.
>
> Now we are looking for some best practices as far as job submission is
> concerned. Having played with a few alternative options, we would like to
> get some input on what other people are using. What we have looked into so
> far:
>
>1. Packaging the job jar into e.g. the JM image and submitting
>manually (either from the UI or via `kubectl exec`). Ideally, we would like
>to establish a more automated setup, preferably using native Kubernetes
>objects.
>2. Building a separate image whose responsibility it is to submit the
>job and keep it running. This could either use the API [5] or share the
>Flink config so that CLI calls connect to the existing cluster. When
>scheduling this as a Kubernetes deployment [6] and e.g. the node running
>this client pod fails, one ends up with duplicate jobs. One could build
>custom logic (poll if job exists, only submit if it does not), but this
>seems fragile and it is conceivable that this could lead to weird timing
>issues like different containers trying to submit at the same time. One
>solution would be to implement an atomic submit-if-not-exists, but I
>suppose this would need to involve some level of locking on the JM.
>3. Schedule the client container from the step above as a Kubernetes
>job [7]. This seems somewhat unidiomatic for streaming jobs that are not
>expected to terminate, but one would not have to deal with duplicate Flink
>jobs. In the failure scenario described above, the (Flink) job would still
>be running on the Flink cluster, there just would not be a client attached
>to it (as the Kubernetes job would not be restarted). On the other hand,
>should the (Flink) job fail for some reason, there is no fashion of
>restarting it automatically.
>
> Are we missing something obvious? Has the Flink community come up with a
> default way of submitting Flink jobs on Kubernetes yet or are there people
> willing to share their experiences?
>
> Best regards and happy holidays,
> Max
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/ops/deployment/kubernetes.html
> [2] https://github.com/docker-flink/examples/tree/master/helm/flink
> [3] http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Flink-HA-with-Kubernetes-without-Zookeeper-td15033.html
> [4] https://www.youtube.com/watch?v=w721NI-mtAA Slides:
> https://www.slideshare.net/FlinkForward/flink-forward-
> berlin-2017-patrick-lucas-flink-in-containerland
> [5] https://ci.apache.org/projects/flink/flink-docs-
> master/monitoring/rest_api.html#submitting-programs
> [6] https://kubernetes.io/docs/concepts/workloads/controllers/deployment/
> [7] https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-
> completion/
> --
> Maximilian Bode * maximilian.b...@tngtech.com
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>


Re: Writing an Integration test for flink-metrics

2017-10-13 Thread Martin Eden
Hi,
Not merged in yet but this is an example pr that is mocking metrics and
checking they are properly updated:
https://github.com/apache/flink/pull/4725


On Fri, Oct 13, 2017 at 1:49 PM, Aljoscha Krettek 
wrote:

> I think we could add this functionality to the (operator) test harnesses.
> I.e. add a mock MetricGroup thingy in there that you can query to check the
> state of metrics.
>
>
> On 13. Oct 2017, at 13:50, Chesnay Schepler  wrote:
>
> I meant that you could unit-test the behavior of the function in
> isolation. You could create a dummy metric group that
> verifies that the correct counters are being registered (based on names i
> guess), as well as provide access to them.
> Mock some input and observe whether the counter value is being modified.
>
> Whether this is a viable option depends a bit on the complexity of the
> function of course, that is how much how mocking
> you would have to do.
>
> On 13.10.2017 11:18, Piotr Nowojski wrote:
>
> For testing Link applications in general you can read
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/
> testing.html
>
> However as we said before, testing metrics would require using custom or a
> imx reporter.
>
> Yes, please report this bug in Jira.
>
> Thanks, Piotrek
>
> On 13 Oct 2017, at 04:31, Colin Williams 
> wrote:
>
> Team wants an integration test, I'm not sure what unit test you had in
> mind. Actually feel that I've been trying to avoid the reporter method but
> that would be more end to end.
>
> The documentation for metrics and Scala are missing with the exception of
> Gauge: https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/monitoring/metrics.html . Should I file a issue against that?
>
> Then it leaves you guessing a little bit how to implement Counters. One
> approach tried was using objects
>
> object PointFilter extends RichMapFunction[...
>
>   @transient lazy val someCounter = 
> getRuntimeContext.getMetricGroup.counter(...)
>
>
> This allowed access to the counter before and after execution . However
> between the unit tests the Counter kept its value also and that's a no for
> the test. Think that might be an issue with ScalaTest.
>
> I've tried to get at the counter from some other directions like trying to
> find a way to inject a reporter to get it's state. But don't see a way to
> do it. So probably the best thing to do is fire up something to collect the
> metrics from the reporter.
>
> On Thu, Oct 12, 2017 at 5:29 AM, Chesnay Schepler 
> wrote:
>
>> Well damn, i should've read the second part of the initial mail.
>>
>> I'm wondering though, could you not unit-test this behavior?
>>
>>
>> On 12.10.2017 14:25, Chesnay Schepler wrote:
>>
>>> You could also write a custom reporter that opens a socket or similar
>>> for communication purposes.
>>>
>>> You can then either query it for the metrics, or even just trigger the
>>> verification in the reporter,
>>> and fail with an error if the reporter returns an error.
>>>
>>> On 12.10.2017 14:02, Piotr Nowojski wrote:
>>>
 Hi,

 Doing as you proposed using JMXReporter (or custom reporter) should
 work. I think there is no easier way to do this at the moment.

 Piotrek

 On 12 Oct 2017, at 04:58, Colin Williams  com> wrote:
>
> I have a RichMapFunction and I'd like to ensure Meter fields are
> properly incremented. I've been trying to think of the best way to do 
> this.
> Currently I think that I'd need to either implement my own reporter (or 
> use
> JMX) and write to a socket, create a listener and wait for the reporter to
> send the message.
>
> Is this a good approach for writing the test, or should I be
> considering something else?
>


>>>
>>>
>>
>
>
>
>


EASY Friday afternoon question: order of chained sink operator execution in a streaming task

2017-09-29 Thread Martin Eden
Hi all,

Just a quick one.

I have a task that looks like this (as printed in the logs):

17-09-29 0703510695 INFO TaskManager.info: Received task Co-Flat Map ->
Process -> (Sink: sink1, Sink: sink2, Sink: sink3) (2/2)

After looking a bit at the code of the streaming task I suppose the sink
operators are chained for each subtask and synchronously executed one after
the other in the order I specified them in code (which does correspond to
the order in the log message).

A particular subtask does something like this on one thread (just focusing
on sinks):

time record invocations
0  record1   sink1.invoke(record1)
1 sink2.invoke(record1)
2 sink3.invoke(record1)
3  record2   sink1.invoke(record2)
4 sink2.invoke(record2)
5 sink3.invoke(record2)
.
.
.

Is that correct?

Thanks


Re: Using latency markers

2017-09-27 Thread Martin Eden
Any follow-up on this? Jira? PR?

On Wed, Sep 13, 2017 at 11:30 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Aitozi,
>
> Yes, I think we haven’t really pin-pointed out the actual cause of the
> problem, but if you have a fix for that and can provide a PR we can
> definitely look at it! That would be helpful.
> Before opening a PR, also make sure to first open a JIRA for the issue (I
> don’t think there is one yet for this issue).
>
> Cheers,
> Gordon
>
> On 13 September 2017 at 12:14:42 PM, aitozi (gjying1...@gmail.com) wrote:
>
> Hi, Aljoscha,
>
> the dashboard shown NAN is just because the value of the latencyGague is
> not
> numerical, so it can't be shown in dashboard, i removed the other
> latencydescprition except the sink, so i can see the latency in dashboard,
> do i need to post a pr?
>
> thanks,
> Aitozi
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>
>


Re: Quick checkpointing related question

2017-09-08 Thread Martin Eden
Thanks for the prompt reply Stefan!

On Fri, Sep 8, 2017 at 9:25 AM, Stefan Richter <s.rich...@data-artisans.com>
wrote:

> Hi,
>
> the method is only called after the checkpoint completed on the job
> manager. At this point _all_ work for the checkpoint is done, so doing work
> in this callback does not add any overhead to the checkpoint.
>
> Best,
> Stefan
>
> > Am 08.09.2017 um 10:20 schrieb Martin Eden <martineden...@gmail.com>:
> >
> > Hi all,
> >
> > I have a Flink 1.3.1 job with a source that implements
> CheckpointingFunction.
> >
> > As I understand it, the notifyCheckpointComplete callback is called when
> all the downstream operators in the DAG successfully finished their
> checkpoints.
> >
> > Since I am doing some work in this method, I would like to know if the
> latency of the execution of this method is reflected in any of the
> checkpointing stats of the source operator? If yes which one? End To End
> Duration / Checkpoint Duration sync or async / Alignment duration?
> >
> > Thanks,
> > M
> >
> >
>
>


Quick checkpointing related question

2017-09-08 Thread Martin Eden
Hi all,

I have a Flink 1.3.1 job with a source that implements
CheckpointingFunction.

As I understand it, the notifyCheckpointComplete callback is called when
all the downstream operators in the DAG successfully finished their
checkpoints.

Since I am doing some work in this method, I would like to know if the
latency of the execution of this method is reflected in any of the
checkpointing stats of the source operator? If yes which one? End To End
Duration / Checkpoint Duration sync or async / Alignment duration?

Thanks,
M


Re: dynamically partitioned stream

2017-09-07 Thread Martin Eden
Hi Tony,

Ah I see. Yes you are right. What I was saying in my last message is that I
relaxed that requirement after realising that it works how you just
described it (and Aljoscha previously) and global state is not really
feasible/possible.

Here is a re-worked example. Please let me know if it makes sense?
Basically f2 is only emitted at time 7 when we have the first values for A
and C emitted after time 4 when f2 itself was emitted.

1. Data Stream:
KEY VALUE TIME
.
.
.
C  V67
B  V67
A  V56
A  V45
*C  V33*
*A  V33*
*B  V33*
B  V22
A  V11

2. Control Stream:
Lambda  ArgumentKeys TIME
.
.
.
f2[A, C] 4
f1[A, B, C]0

3. Expected emitted stream:
TIMEVALUE
.
.
.
7  f1(V5, V6, V3)
f1(V5, V6, V6)
*f2(V5, V6)*
6  f1(V5, V3, V3)
5  f1(V4, V3, V3)
4  -
*3  f1(V3, V3, V3) - or f1(V1,V2,V3) if C,V3 arrives before A V3 or
B V3*
2  -
1  -
0  -

I guess to wrap up this discussion there is only one more thing to clarify
about this. There is some non-determinism as to what results are emitted. I
have highlighted this in red above.

Basically the example assumes the order of events perceived in the
operators is as it in the incoming data stream and is maintained throughout
the Flink dag, regardless of keyBys and flatMaps, connect etc.

However if the order of the events is changed by Flink after the source
receives and before it reaches the first flatMap1 in the first connected
operator (i.e. the 3 events for time 3 are re-ordered) then the lambda
emits different combinations.

Questions:

1. Is what I am saying correct? Can Flink change the order of events in an
input stream as they arrive to the input of flatMap in a connected stream?
Maybe because of the key by?

2. Can this be remedied by using a different time dimension like event time
or ingest time? Would that work for a simple connected streams? Does it
need to be somehow combined with a windowed operator?

To clarify, in my mind, in the example above the TIME column was really
both event time and ingest time for the data stream and control stream (in
the same order as written) and processing time for the emitted output
stream.

So the data stream was really this:

KEY VALUE EV_TIME INGEST_TIME PROCESSING_TIME
.
.
.
C  V33  33 43
A  V32  32 42
B  V31  31 41

Thanks,
M


On Thu, Sep 7, 2017 at 9:24 AM, Tony Wei <tony19920...@gmail.com> wrote:

> Hi Martin,
>
> What I was talking is about how to store the arguments' state. In the
> example you explained your use case to Aljoscha.
>
> 4  f1(V4, V3, V3)
> f2(V4, V3)
> 3  f1(V3, V3, V3)
> 2  -
> 1  -
>
> You showed that when lambda f2 came, it would emit f2(V4, V3) immediately.
> However, the second argument (B, V3, 3) came before f2. You couldn't know
> how to route it at that time.
>
> If you didn't store this data's state, then f2(V4, V3) won't happen and
> the problem is easily.
> Otherwise, you had to route all data and all lambda to the same node to
> guarantee that every lambda won't lose any  their arguments' state.
>
> Best,
> Tony Wei
>
> 2017-09-07 14:31 GMT+08:00 Martin Eden <martineden...@gmail.com>:
>
>> Hi Tony,
>>
>> Yes exactly I am assuming the lambda emits a value only after it has been
>> published to the control topic (t1) and at least 1 value arrives in the
>> data topic for each of it's arguments. This will happen at a time t2 > t1.
>> So yes, there is uncertainty with regards to when t2 will happen. Ideally
>> t2 - t1 ~ 0 but for our use case it is fine. Is this the correctness that
>> you are talking about? Do I have the right picture of what happens?
>>
>> Thanks
>> M
>>
>> On Thu, Sep 7, 2017 at 3:11 AM, Tony Wei <tony19920...@gmail.com> wrote:
>>
>>> Hi Martin,
>>>
>>> The performance is an issue, but in your case, yes, it might not be a
>>> problem if X << N.
>>>
>>> However, the other problem is where data should go in the beginning if
>>> there is no lambda been received. This problem doesn't associate with
>>> performance, but instead with correctness. If you want to keep the value
>>> state for the incoming lambda you should broadcast it to all nodes, because
>>> you would never know where the next lambda that requires this data would be
>>> routed to. Of course, you can send this data to a pre-defined node and
>>> route the lambda to this node, but this will lead 

Re: dynamically partitioned stream

2017-09-07 Thread Martin Eden
Hi Tony,

Yes exactly I am assuming the lambda emits a value only after it has been
published to the control topic (t1) and at least 1 value arrives in the
data topic for each of it's arguments. This will happen at a time t2 > t1.
So yes, there is uncertainty with regards to when t2 will happen. Ideally
t2 - t1 ~ 0 but for our use case it is fine. Is this the correctness that
you are talking about? Do I have the right picture of what happens?

Thanks
M

On Thu, Sep 7, 2017 at 3:11 AM, Tony Wei <tony19920...@gmail.com> wrote:

> Hi Martin,
>
> The performance is an issue, but in your case, yes, it might not be a
> problem if X << N.
>
> However, the other problem is where data should go in the beginning if
> there is no lambda been received. This problem doesn't associate with
> performance, but instead with correctness. If you want to keep the value
> state for the incoming lambda you should broadcast it to all nodes, because
> you would never know where the next lambda that requires this data would be
> routed to. Of course, you can send this data to a pre-defined node and
> route the lambda to this node, but this will lead to all data in the same
> node to let all lambda can get all required data. It is not a good solution
> because of a lack of scalability.
>
> In my origin thought, it is based on only storing state of data after you
> receive at least one lambda that requires it, so that data has its
> destination node to go. Can this assumption be acceptable in your case?
> What do you think?
>
> Best,
> Tony Wei
>
> 2017-09-06 22:41 GMT+08:00 Martin Eden <martineden...@gmail.com>:
>
>> Hi Aljoscha, Tony,
>>
>> We actually do not need all the keys to be on all nodes where lambdas
>> are. We just need the keys that represent the data for the lambda arguments
>> to be routed to the same node as the lambda, whichever one it might be.
>>
>> Essentially in the solution we emit the data multiple times and by doing
>> that we roughly multiply the input rate by the average number of lambdas a
>> key is a part of (X). In terms of memory this is O(X * N) where N is the
>> number of keys int the data. N is the large bit. If X ~ N then we have O
>> (N^2) complexity for the Flink state. And in that case yes I see your point
>> about performance Aljoscha. But if X << N, as is our case, then we have
>> O(N) which should be manageable by Flink's distributed state mechanism
>> right? Do you see any gotchas in this new light? Are my assumptions correct?
>>
>> Thanks,
>> M
>>
>>
>>
>>
>>
>> On Sat, Sep 2, 2017 at 3:38 AM, Tony Wei <tony19920...@gmail.com> wrote:
>>
>>> Hi Martin, Aljoscha
>>>
>>> I think Aljoscha is right. My origin thought was to keep the state only
>>> after a lambda function coming.
>>>
>>> Use Aljoscha's scenario as example, initially, all data will be
>>> discarded because there is no any lambdas. When lambda f1 [D, E] and f2
>>> [A, C] comes, A, C begin to be routed to machine "0" and D, E begin to be
>>> routed to machine "1". Then, when we get a new lambda f3 [C, D], we can
>>> duplicate C, D and route these copies to machine "2".
>>>
>>> However, after reading your example again, I found what you want is a
>>> whole picture for all variables' state in a global view, so that no matter
>>> what time a new lambda comes it can always get its variables' state
>>> immediately. In that case, I have the same opinion as Aljoscha.
>>>
>>> Best,
>>> Tony Wei
>>>
>>> 2017-09-01 23:59 GMT+08:00 Aljoscha Krettek <aljos...@apache.org>:
>>>
>>>> Hi Martin,
>>>>
>>>> I think with those requirements this is very hard (or maybe impossible)
>>>> to do efficiently in a distributed setting. It might be that I'm
>>>> misunderstanding things but let's look at an example. Assume that
>>>> initially, we don't have any lambdas, so data can be sent to any machine
>>>> because it doesn't matter where they go. Now, we get a new lambda f2 [A,
>>>> C]. Say this gets routed to machine "0", now this means that messages with
>>>> key A and C also need to be router to machine "0". Now, we get a new lambda
>>>> f1 [D, E], say this gets routed to machine "2", meaning that messages with
>>>> key D and E are also routed to machine "2".
>>>>
>>>> Then, we get a new lambda f3 [C, D]. Do we now re-route all previous
>>>> lambdas and inputs to different m

Re: dynamically partitioned stream

2017-09-06 Thread Martin Eden
Hi Aljoscha, Tony,

We actually do not need all the keys to be on all nodes where lambdas are.
We just need the keys that represent the data for the lambda arguments to
be routed to the same node as the lambda, whichever one it might be.

Essentially in the solution we emit the data multiple times and by doing
that we roughly multiply the input rate by the average number of lambdas a
key is a part of (X). In terms of memory this is O(X * N) where N is the
number of keys int the data. N is the large bit. If X ~ N then we have O
(N^2) complexity for the Flink state. And in that case yes I see your point
about performance Aljoscha. But if X << N, as is our case, then we have
O(N) which should be manageable by Flink's distributed state mechanism
right? Do you see any gotchas in this new light? Are my assumptions correct?

Thanks,
M





On Sat, Sep 2, 2017 at 3:38 AM, Tony Wei <tony19920...@gmail.com> wrote:

> Hi Martin, Aljoscha
>
> I think Aljoscha is right. My origin thought was to keep the state only
> after a lambda function coming.
>
> Use Aljoscha's scenario as example, initially, all data will be discarded
> because there is no any lambdas. When lambda f1 [D, E] and f2 [A, C]
> comes, A, C begin to be routed to machine "0" and D, E begin to be routed
> to machine "1". Then, when we get a new lambda f3 [C, D], we can
> duplicate C, D and route these copies to machine "2".
>
> However, after reading your example again, I found what you want is a
> whole picture for all variables' state in a global view, so that no matter
> what time a new lambda comes it can always get its variables' state
> immediately. In that case, I have the same opinion as Aljoscha.
>
> Best,
> Tony Wei
>
> 2017-09-01 23:59 GMT+08:00 Aljoscha Krettek <aljos...@apache.org>:
>
>> Hi Martin,
>>
>> I think with those requirements this is very hard (or maybe impossible)
>> to do efficiently in a distributed setting. It might be that I'm
>> misunderstanding things but let's look at an example. Assume that
>> initially, we don't have any lambdas, so data can be sent to any machine
>> because it doesn't matter where they go. Now, we get a new lambda f2 [A,
>> C]. Say this gets routed to machine "0", now this means that messages with
>> key A and C also need to be router to machine "0". Now, we get a new lambda
>> f1 [D, E], say this gets routed to machine "2", meaning that messages with
>> key D and E are also routed to machine "2".
>>
>> Then, we get a new lambda f3 [C, D]. Do we now re-route all previous
>> lambdas and inputs to different machines? They all have to go to the same
>> machine, but which one? I'm currently thinking that there would need to be
>> some component that does the routing, but this has to be global, so it's
>> hard to do in a distributed setting.
>>
>> What do you think?
>>
>> Best,
>> Aljoscha
>>
>> On 1. Sep 2017, at 07:17, Martin Eden <martineden...@gmail.com> wrote:
>>
>> This might be a way forward but since side inputs are not there I will
>> try and key the control stream by the keys in the first co flat map.
>>
>> I'll see how it goes.
>>
>> Thanks guys,
>> M
>>
>> On Thu, Aug 31, 2017 at 5:16 PM, Tony Wei <tony19920...@gmail.com> wrote:
>>
>>> Hi Martin,
>>>
>>> Yes, that is exactly what I thought.
>>> But the first step also needs to be fulfilled  by SideInput. I'm not
>>> sure how to achieve this in the current release.
>>>
>>> Best,
>>> Tony Wei
>>>
>>> Martin Eden <martineden...@gmail.com>於 2017年8月31日 週四,下午11:32寫道:
>>>
>>>> Hi Aljoscha, Tony,
>>>>
>>>> Aljoscha:
>>>> Yes it's the first option you mentioned.
>>>> Yes, the stream has multiple values in flight for A, B, C. f1 needs to
>>>> be applied each time a new value for either A, B or C comes in. So we need
>>>> to use state to cache the latest values. So using the example data stream
>>>> in my first msg the emitted stream should be:
>>>>
>>>> 1. Data Stream:
>>>> KEY VALUE TIME
>>>> .
>>>> .
>>>> .
>>>> C  V66
>>>> B  V66
>>>> A  V55
>>>> A  V44
>>>> C  V33
>>>> A  V33
>>>> B  V33
>>>> B  V22
>>>> A  V11
>>>>
>>>> 2. Control Stream:
>>>> Lambda  ArgumentK

Re: dynamically partitioned stream

2017-08-31 Thread Martin Eden
This might be a way forward but since side inputs are not there I will try
and key the control stream by the keys in the first co flat map.

I'll see how it goes.

Thanks guys,
M

On Thu, Aug 31, 2017 at 5:16 PM, Tony Wei <tony19920...@gmail.com> wrote:

> Hi Martin,
>
> Yes, that is exactly what I thought.
> But the first step also needs to be fulfilled  by SideInput. I'm not sure
> how to achieve this in the current release.
>
> Best,
> Tony Wei
>
> Martin Eden <martineden...@gmail.com>於 2017年8月31日 週四,下午11:32寫道:
>
>> Hi Aljoscha, Tony,
>>
>> Aljoscha:
>> Yes it's the first option you mentioned.
>> Yes, the stream has multiple values in flight for A, B, C. f1 needs to be
>> applied each time a new value for either A, B or C comes in. So we need to
>> use state to cache the latest values. So using the example data stream in
>> my first msg the emitted stream should be:
>>
>> 1. Data Stream:
>> KEY VALUE TIME
>> .
>> .
>> .
>> C  V66
>> B  V66
>> A  V55
>> A  V44
>> C  V33
>> A  V33
>> B  V33
>> B  V22
>> A  V11
>>
>> 2. Control Stream:
>> Lambda  ArgumentKeys TIME
>> .
>> .
>> .
>> f2[A, C] 4
>> f1[A, B, C]1
>>
>> 3. Expected emitted stream:
>> TIMEVALUE
>> .
>> .
>> .
>> 6  f1(V5, V6, V3)
>> f1(V5, V6, V6)
>> f2(V5, V6)
>> 5  f1(V5, V3, V3)
>> f2(V5, V3)
>> 4  f1(V4, V3, V3)
>> f2(V4, V3)
>> 3  f1(V3, V3, V3)
>> 2  -
>> 1  -
>>
>> So essentially as soon as the argument list fills up then we apply the
>> function/lambda at each new arriving message in the data stream for either
>> argument key.
>>
>> Tony:
>> Yes we need to group by and pass to the lambda.
>> Ok, so what you are proposing might work. So your solution assumes that
>> we have to connect with the control stream twice? Once for the tagging and
>> another time re-connect-ing the control stream with the tagged stream for
>> the actual application of the function/lambda?
>>
>> Thanks,
>> Alex
>>
>>
>>
>> On Thu, Aug 31, 2017 at 2:57 PM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi Martin,
>>>
>>> In your original example, what does this syntax mean exactly:
>>>
>>> f1[A, B, C]1
>>>
>>> Does it mean that f1 needs one A, one B and one C from the main stream?
>>> If yes, which ones, because there are multiple As and Bs and so on. Or does
>>> it mean that f1 can apply to an A or a B or a C? If it's the first, then I
>>> think it's quite hard to find a partitioning such that both f1, f2, and all
>>> A, B, and C go to the same machine.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 31. Aug 2017, at 15:53, Tony Wei <tony19920...@gmail.com> wrote:
>>>
>>> Hi Martin,
>>>
>>> So the problem is that you want to group those arguments in Data Stream
>>> and pass them to the lambda function from Control Stream at the same time.
>>> Am I right?
>>>
>>> If right, then you could give each lambda function an id as well. Use
>>> these ids to tag those arguments to which they belong.
>>> After that, keyBy function could be used to group those arguments
>>> belonging to the same lambda function. Joining this stream with Control
>>> Stream by function id could make arguments and function be in the same
>>> instance.
>>>
>>> What do you think? Could this solution solve your problem?
>>>
>>> Best,
>>> Tony Wei
>>>
>>> 2017-08-31 20:43 GMT+08:00 Martin Eden <martineden...@gmail.com>:
>>>
>>>> Thanks for your reply Tony,
>>>>
>>>> Yes we are in the latter case, where the functions/lambdas come in the
>>>> control stream. Think of them as strings containing the logic of the
>>>> function. The values for each of the arguments to the function come from
>>>> the data stream. That is why we need to co-locate the data stream messages
>>>> for the corresponding keys with the control message that has the function
>>>> to be applied.
>>>>
>>>> We have a way of interpreting the logic described

Re: dynamically partitioned stream

2017-08-31 Thread Martin Eden
Hi Aljoscha, Tony,

Aljoscha:
Yes it's the first option you mentioned.
Yes, the stream has multiple values in flight for A, B, C. f1 needs to be
applied each time a new value for either A, B or C comes in. So we need to
use state to cache the latest values. So using the example data stream in
my first msg the emitted stream should be:

1. Data Stream:
KEY VALUE TIME
.
.
.
C  V66
B  V66
A  V55
A  V44
C  V33
A  V33
B  V33
B  V22
A  V11

2. Control Stream:
Lambda  ArgumentKeys TIME
.
.
.
f2[A, C] 4
f1[A, B, C]1

3. Expected emitted stream:
TIMEVALUE
.
.
.
6  f1(V5, V6, V3)
f1(V5, V6, V6)
f2(V5, V6)
5  f1(V5, V3, V3)
f2(V5, V3)
4  f1(V4, V3, V3)
f2(V4, V3)
3  f1(V3, V3, V3)
2  -
1  -

So essentially as soon as the argument list fills up then we apply the
function/lambda at each new arriving message in the data stream for either
argument key.

Tony:
Yes we need to group by and pass to the lambda.
Ok, so what you are proposing might work. So your solution assumes that we
have to connect with the control stream twice? Once for the tagging and
another time re-connect-ing the control stream with the tagged stream for
the actual application of the function/lambda?

Thanks,
Alex



On Thu, Aug 31, 2017 at 2:57 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi Martin,
>
> In your original example, what does this syntax mean exactly:
>
> f1[A, B, C]1
>
> Does it mean that f1 needs one A, one B and one C from the main stream? If
> yes, which ones, because there are multiple As and Bs and so on. Or does it
> mean that f1 can apply to an A or a B or a C? If it's the first, then I
> think it's quite hard to find a partitioning such that both f1, f2, and all
> A, B, and C go to the same machine.
>
> Best,
> Aljoscha
>
> On 31. Aug 2017, at 15:53, Tony Wei <tony19920...@gmail.com> wrote:
>
> Hi Martin,
>
> So the problem is that you want to group those arguments in Data Stream
> and pass them to the lambda function from Control Stream at the same time.
> Am I right?
>
> If right, then you could give each lambda function an id as well. Use
> these ids to tag those arguments to which they belong.
> After that, keyBy function could be used to group those arguments
> belonging to the same lambda function. Joining this stream with Control
> Stream by function id could make arguments and function be in the same
> instance.
>
> What do you think? Could this solution solve your problem?
>
> Best,
> Tony Wei
>
> 2017-08-31 20:43 GMT+08:00 Martin Eden <martineden...@gmail.com>:
>
>> Thanks for your reply Tony,
>>
>> Yes we are in the latter case, where the functions/lambdas come in the
>> control stream. Think of them as strings containing the logic of the
>> function. The values for each of the arguments to the function come from
>> the data stream. That is why we need to co-locate the data stream messages
>> for the corresponding keys with the control message that has the function
>> to be applied.
>>
>> We have a way of interpreting the logic described in the string and
>> executing it on the incoming values from the data stream. This is kicked
>> off from within the Flink runtime (synchronous to a flatMap of the
>> RichCoFlatMapFunction) but is not using Flink predefined operators or
>> functions.
>>
>> So yeah I see your point about mapping the arguments but the problem is
>> not really that, the problem is making sure that the values in the control
>> stream are in the same instance of the task/ keyed managed state as a the
>> actual control stream message. Once they are we can pass them in.
>>
>> Any other thoughts?
>>
>> M
>>
>>
>>
>>
>>
>>
>>
>> On Thu, Aug 31, 2017 at 12:06 PM, Tony Wei <tony19920...@gmail.com>
>> wrote:
>>
>>> Hi Martin,
>>>
>>> About problem 2. How were those lambda functions created? Pre-defined
>>> functions / operators or automatically generated based on the message from
>>> Control Stream?
>>>
>>> For the former, you could give each function one id and user flapMap to
>>> duplicate data with multiple ids. Then, you could use filter function and
>>> send them to the corresponding operators.
>>>
>>> For the general case like the latter, because you had broadcasted the
>>> messages to all tasks, it could always build a mapping table from argument
>>> keys to lambda functio

Re: dynamically partitioned stream

2017-08-31 Thread Martin Eden
Thanks for your reply Tony,

Yes we are in the latter case, where the functions/lambdas come in the
control stream. Think of them as strings containing the logic of the
function. The values for each of the arguments to the function come from
the data stream. That is why we need to co-locate the data stream messages
for the corresponding keys with the control message that has the function
to be applied.

We have a way of interpreting the logic described in the string and
executing it on the incoming values from the data stream. This is kicked
off from within the Flink runtime (synchronous to a flatMap of the
RichCoFlatMapFunction) but is not using Flink predefined operators or
functions.

So yeah I see your point about mapping the arguments but the problem is not
really that, the problem is making sure that the values in the control
stream are in the same instance of the task/ keyed managed state as a the
actual control stream message. Once they are we can pass them in.

Any other thoughts?

M







On Thu, Aug 31, 2017 at 12:06 PM, Tony Wei <tony19920...@gmail.com> wrote:

> Hi Martin,
>
> About problem 2. How were those lambda functions created? Pre-defined
> functions / operators or automatically generated based on the message from
> Control Stream?
>
> For the former, you could give each function one id and user flapMap to
> duplicate data with multiple ids. Then, you could use filter function and
> send them to the corresponding operators.
>
> For the general case like the latter, because you had broadcasted the
> messages to all tasks, it could always build a mapping table from argument
> keys to lambda functions in each sub-task and use the map to process the
> data. But I was wondering if it is possible to generate a completely new
> function in the runtime.
>
> Best,
> Tony Wei
>
> 2017-08-31 18:33 GMT+08:00 Martin Eden <martineden...@gmail.com>:
>
>> Thanks for your reply Tony.
>>
>> So there are actually 2 problems to solve:
>>
>> 1. All control stream msgs need to be broadcasted to all tasks.
>>
>> 2. The data stream messages with the same keys as those specified in the
>> control message need to go to the same task as well, so that all the values
>> required for the lambda (i.e. functions f1, f2 ...) are there.
>>
>> In my understanding side inputs (which are actually not available in the
>> current release) would address problem 1.
>>
>> To address problem 1 I also tried dataStream.keyBy(key).connect(
>> controlStream.broadcast).flatMap(new RichCoFlatMapFunction) but I get a
>> runtime exception telling me I still need to do a keyBy before the flatMap.
>> So are the upcoming side inputs the only way to broadcast a control stream
>> to all tasks of a coFlatMap? Or is there another way?
>>
>> As for problem 2, I am still pending a reply. Would appreciate if anyone
>> has some suggestions.
>>
>> Thanks,
>> M
>>
>>
>>
>>
>> On Thu, Aug 31, 2017 at 9:59 AM, Tony Wei <tony19920...@gmail.com> wrote:
>>
>>> Hi Martin,
>>>
>>> Let me understand your question first.
>>> You have two Stream: Data Stream and Control Stream and you want to
>>> select data in Data Stream based on the key set got from Control Stream.
>>>
>>> If I were not misunderstanding your question, I think SideInput is what
>>> you want.
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Si
>>> de+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStream
>>> API-StoringSide-InputData
>>> It lets you to define one stream as a SideInput and can be assigned to
>>> the other stream, then the data in SideInput stream will be broadcasted.
>>>
>>> So far, I have no idea if there is any solution to solve this without
>>> SideInput.
>>>
>>> Best,
>>> Tony Wei
>>>
>>> 2017-08-31 16:10 GMT+08:00 Martin Eden <martineden...@gmail.com>:
>>>
>>>> Hi all,
>>>>
>>>> I am trying to implement the following using Flink:
>>>>
>>>> I have 2 input message streams:
>>>>
>>>> 1. Data Stream:
>>>> KEY VALUE TIME
>>>> .
>>>> .
>>>> .
>>>> C  V66
>>>> B  V66
>>>> A  V55
>>>> A  V44
>>>> C  V33
>>>> A  V33
>>>> B  V33
>>>> B  V22
>>>> A  V11
>>>>
>>>> 2. Control Stream:
>>>> Lambda  ArgumentKeys TIME
>>

Re: dynamically partitioned stream

2017-08-31 Thread Martin Eden
Thanks for your reply Tony.

So there are actually 2 problems to solve:

1. All control stream msgs need to be broadcasted to all tasks.

2. The data stream messages with the same keys as those specified in the
control message need to go to the same task as well, so that all the values
required for the lambda (i.e. functions f1, f2 ...) are there.

In my understanding side inputs (which are actually not available in the
current release) would address problem 1.

To address problem 1 I also tried
dataStream.keyBy(key).connect(controlStream.broadcast).flatMap(new
RichCoFlatMapFunction) but I get a runtime exception telling me I still
need to do a keyBy before the flatMap. So are the upcoming side inputs the
only way to broadcast a control stream to all tasks of a coFlatMap? Or is
there another way?

As for problem 2, I am still pending a reply. Would appreciate if anyone
has some suggestions.

Thanks,
M




On Thu, Aug 31, 2017 at 9:59 AM, Tony Wei <tony19920...@gmail.com> wrote:

> Hi Martin,
>
> Let me understand your question first.
> You have two Stream: Data Stream and Control Stream and you want to select
> data in Data Stream based on the key set got from Control Stream.
>
> If I were not misunderstanding your question, I think SideInput is what
> you want.
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 17+Side+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStreamAPI-
> StoringSide-InputData
> It lets you to define one stream as a SideInput and can be assigned to the
> other stream, then the data in SideInput stream will be broadcasted.
>
> So far, I have no idea if there is any solution to solve this without
> SideInput.
>
> Best,
> Tony Wei
>
> 2017-08-31 16:10 GMT+08:00 Martin Eden <martineden...@gmail.com>:
>
>> Hi all,
>>
>> I am trying to implement the following using Flink:
>>
>> I have 2 input message streams:
>>
>> 1. Data Stream:
>> KEY VALUE TIME
>> .
>> .
>> .
>> C  V66
>> B  V66
>> A  V55
>> A  V44
>> C  V33
>> A  V33
>> B  V33
>> B  V22
>> A  V11
>>
>> 2. Control Stream:
>> Lambda  ArgumentKeys TIME
>> .
>> .
>> .
>> f2[A, C] 4
>> f1[A, B, C]1
>>
>> I want to apply the lambdas coming in the control stream to the selection
>> of keys that are coming in the data stream.
>>
>> Since we have 2 streams I naturally thought of connecting them using
>> .connect. For this I need to key both of them by a certain criteria. And
>> here lies the problem, how can I make sure the messages with keys A,B,C
>> specified in the control stream end up in the same task as well as the
>> control message (f1, [A, B, C]) itself. Basically I don't know how to key
>> by to achieve this.
>>
>> I suspect a custom partitioner is required that partitions the data
>> stream based on the messages in the control stream? Is this even possible?
>>
>> Any suggestions welcomed!
>>
>> Thanks,
>> M
>>
>>
>>
>>
>>
>>
>>
>


dynamically partitioned stream

2017-08-31 Thread Martin Eden
Hi all,

I am trying to implement the following using Flink:

I have 2 input message streams:

1. Data Stream:
KEY VALUE TIME
.
.
.
C  V66
B  V66
A  V55
A  V44
C  V33
A  V33
B  V33
B  V22
A  V11

2. Control Stream:
Lambda  ArgumentKeys TIME
.
.
.
f2[A, C] 4
f1[A, B, C]1

I want to apply the lambdas coming in the control stream to the selection
of keys that are coming in the data stream.

Since we have 2 streams I naturally thought of connecting them using
.connect. For this I need to key both of them by a certain criteria. And
here lies the problem, how can I make sure the messages with keys A,B,C
specified in the control stream end up in the same task as well as the
control message (f1, [A, B, C]) itself. Basically I don't know how to key
by to achieve this.

I suspect a custom partitioner is required that partitions the data stream
based on the messages in the control stream? Is this even possible?

Any suggestions welcomed!

Thanks,
M


Re: metrics for Flink sinks

2017-08-30 Thread Martin Eden
Thanks Chesnay,

Just for completeness, are there any relevant tickets for the discussion
that one can follow, upvote, contribute to?

M

On Tue, Aug 29, 2017 at 8:57 PM, Chesnay Schepler <ches...@apache.org>
wrote:

> Hello,
>
> 1. Because no one found time to fix it. In contrast to the remaining
> byte/record metrics, input metrics for sources / output metrics for sinks
> have to be implemented for every single implementation with their
> respective semantics. In contrast, the output metrics are gathered in the
> intersection between operators, independent of the actual operator
> implementation. Furthermore, this requires system metrics (i.e. metrics
> that Flink itself creates) to be exposed (and be mutable!) to user-defined
> functions, which is something i *generally *wanted to avoid, but it
> appears to be a big enough pain point to make an exception here.
>
> 2. Due to the above it is currently not possible without modifications of
> the code to know how many reads/writes were made.
>
> 3. Do you mean aggregated metrics? The web UI allows the aggregation of
> record/byte metrics on the task level. Beyond that we defer aggregation to
> actual time-series databases that specialize in these things.
>
>
> On 28.08.2017 19:08, Martin Eden wrote:
>
> Hi all,
>
> Just 3 quick questions both related to Flink metrics, especially around
> sinks:
>
> 1. In the Flink UI Sources always have 0 input records / bytes and Sinks
> always have 0 output records / bytes? Why is it like that?
>
> 2. What is the best practice for instrumenting off the shelf Flink sinks?
>
> Currently the only metrics available are num records/bytes in and out at
> the operator and task scope. For the task scope there are extra buffer
> metrics. However the output metrics are always zero (see question 1). How
> can one know the actual number of successful writes done by an off the
> shelf Flink sink? Or the latency of the write operation?
>
> 3. Is it possible to configure Flink to get global job metrics for all
> subtasks of an operator? Or are there any best practices around that?
>
> Thanks,
> M
>
>
>


metrics for Flink sinks

2017-08-28 Thread Martin Eden
Hi all,

Just 3 quick questions both related to Flink metrics, especially around
sinks:

1. In the Flink UI Sources always have 0 input records / bytes and Sinks
always have 0 output records / bytes? Why is it like that?

2. What is the best practice for instrumenting off the shelf Flink sinks?

Currently the only metrics available are num records/bytes in and out at
the operator and task scope. For the task scope there are extra buffer
metrics. However the output metrics are always zero (see question 1). How
can one know the actual number of successful writes done by an off the
shelf Flink sink? Or the latency of the write operation?

3. Is it possible to configure Flink to get global job metrics for all
subtasks of an operator? Or are there any best practices around that?

Thanks,
M


Re: Is that possible for flink to dynamically read and change configuration?

2017-07-24 Thread Martin Eden
Hey Desheng,

Some options that come to mind:
- Cave man style: Stop and restart job with new config.
- Poll scenario: You could build your own thread that periodically loads
from the db into a per worker accessible cache.
- Push scenario: have a config stream (based off of some queue) which you
connect to your data stream via the connect operator. In the
CoFlatMapFunction that you have to provide you basically update Flink state
from the config flatMap and read the flink state from the data flatMap and
pass it along with the data. Then in the specific operator that uses the
config it can always get it from the data tuple that comes alongside the
data, say in an invoke method call of a sink. Example here

.

Hope that gives u some ideas,
M


On Mon, Jul 24, 2017 at 12:16 PM, ZalaCheung <
gzzhangdesh...@corp.netease.com> wrote:

> Hi all,
>
> I am  now trying to implement a anomaly detection algorithm on Flink,
> which is actually implement a Map operator to do anomaly detection based on
> timeseries.
> At first I want to read configuration(like which kafka source host to read
> datastream from and which sink address to write data to ) from mongo db. It
> contains some system metric  I want to monitor.
>
> What I did was read configuration from mongo DB and set as configuration
> of flink.
>
> StreamExecutionEnvironment see = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> Configuration conf = new Configuration();
>
> JSONObject jsonConfiguration = readConfiguration();
>
> conf.setInteger("period",jsonConfiguration.getInt("period"));
> conf.setDouble("percentage",jsonConfiguration.getDouble("percentage"));
> conf.setDouble(“metric",jsonConfiguration.getDouble(“metric"));
>
> see.getConfig().setGlobalJobParameters(conf);
>
> The “readConfiguration()” method read the configuration from mongoDB.
>
> Just like the code I showed above. I set globalJobParameters to let all my
> workers share these parameters including the metric I want to monitor.But
> maybe at some point I want to change the metric I want to monitor. I think
> one possible way is to dynamically(or periodically) read  configuration and
> reset the globalJobParameters to make the Flink program to change the
> metric to monitor. Is  that possible?
>
> Thanks
> Desheng Zhang
>
>
>


Re: AVRO Union type support in Flink

2017-07-18 Thread Martin Eden
Hey Vishnu,

For those of us on the list that are not very familiar with Flink and Avro
can you give a pointed to the docs you are referring to and how you intend
to use it? Just so we gain understanding as well.

Thanks,
Martin

On Tue, Jul 18, 2017 at 9:12 PM, Vishnu Viswanath <
vishnu.viswanat...@gmail.com> wrote:

> Hi All,
>
> Does Flink support AVRO union types - Documentation says it supports
> nullable types: {"name": "type_double_test", "type": ["null", "double"]}
>
> But my schema has something like : {"name": "union_field", "type":
> ["string", "double"]}
>
> Thanks
> Vishnu
>
>
>


Re: Disk I/O in Flink

2017-04-29 Thread Martin Eden
Hi Robert,

Any updates on the below for the community?

Thanks,
M

On Tue, Apr 25, 2017 at 8:50 AM, Robert Schmidtke 
wrote:

> Hi Ufuk, thanks for coming back to me on this.
>
> The records are 100 bytes in size, the benchmark being TeraSort, so that
> should not be an issue. I have played around with the input size, and here
> are my observations:
>
> 128 GiB input: 0 Spilling in Flink.
> 256 GiB input: 88 GiB Spilling in Flink (so 88 GiB of reads, 88 GiB of
> writes), and my instrumentation covers all of it.
> 384 GiB input: 391 GiB Spilling in Flink, and I cover all of it.
> 512 GiB input: 522 GiB Spilling in Flink, but I miss 140 GiB of it.
> 640 GiB input: 653 GiB Spilling in Flink, but I miss 281 GiB of it.
> 768 GiB input: 784 GiB Spilling in Flink, but I miss 490 GiB of it.
> 896 GiB input: 914 GiB Spilling in Flink, but I miss 662 GiB of it.
> 1024 GiB input: 1045 GiB Spilling in Flink, but I miss 968 GiB of it.
>
> So regardless of how well configured my system is and spilling is even
> necessary, it seems that with larger spilling amounts, the way the data is
> spilled changes (and I start missing larger and larger portions of I/O
> until almost 100%).
> Now since I have written the instrumentation myself, I cannot guarantee
> that it is flawless and I might have missed something.
> I'm currently looking into how the file channels are being accessed in
> parallel by multiple threads, which I cover as well and my tests verify it,
> but maybe there are special access patterns here.
>
> Robert
>
> On Mon, Apr 24, 2017 at 2:25 PM, Ufuk Celebi  wrote:
>
>> Hey Robert,
>>
>> for batch that should cover the relevant spilling code. If the records
>> are >= 5 MB, the SpillingAdaptiveSpanningRecordDeserializer will spill
>> incoming records as well. But that should be covered by the
>> FileChannel instrumentation as well?
>>
>> – Ufuk
>>
>>
>> On Tue, Apr 18, 2017 at 3:57 PM, Robert Schmidtke
>>  wrote:
>> > Hi,
>> >
>> > I have already looked at the UnilateralSortMerger, concluding that all
>> I/O
>> > eventually goes via SegmentReadRequest and SegmentWriteRequest (which in
>> > turn use java.nio.channels.FileChannel) in AsynchronousFileIOChannel.
>> Are
>> > there more interaction points between Flink and the underlying file
>> system
>> > that I might want to consider?
>> >
>> > Thanks!
>> > Robert
>> >
>> > On Fri, Apr 7, 2017 at 5:02 PM, Kurt Young  wrote:
>> >>
>> >> Hi,
>> >>
>> >> You probably want check out UnilateralSortMerger.java, this is the
>> class
>> >> which is responsible for external sort for flink. Here is a short
>> >> description for how it works: there are totally 3 threads working
>> together,
>> >> one for reading, one for sorting partial data in memory, and the last
>> one is
>> >> responsible for spilling. Flink will first figure out how many memory
>> it can
>> >> use during the in-memory sort, and manage them as MemorySegments. Once
>> these
>> >> memory runs out, the sorting thread will take over these memory and do
>> the
>> >> in-memory sorting (For more details about in-memory sorting, you can
>> see
>> >> NormalizedKeySorter). After this, the spilling thread will write this
>> sorted
>> >> data to disk and make these memory available again for reading. This
>> will
>> >> repeated until all data has been processed.
>> >> Normally, the data will be read twice (one from source, and one from
>> disk)
>> >> and write once, but if you spilled too much files, flink will first
>> merge
>> >> some all the files and make sure the last merge step will not exceed
>> some
>> >> limit (default 128). Hope this can help you.
>> >>
>> >> Best,
>> >> Kurt
>> >>
>> >> On Fri, Apr 7, 2017 at 4:20 PM, Robert Schmidtke <
>> ro.schmid...@gmail.com>
>> >> wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> I'm currently examining the I/O patterns of Flink, and I'd like to
>> know
>> >>> when/how Flink goes to disk. Let me give an introduction of what I
>> have done
>> >>> so far.
>> >>>
>> >>> I am running TeraGen (from the Hadoop examples package) + TeraSort
>> >>> (https://github.com/robert-schmidtke/terasort) on a 16 node cluster,
>> each
>> >>> node with 64 GiB of memory, 2x32 cores, and roughly half a terabyte
>> of disk.
>> >>> I'm using YARN and HDFS. The underlying file system is XFS.
>> >>>
>> >>> Now before running TeraGen and TeraSort, I reset the XFS counters to
>> >>> zero, and after TeraGen + TeraSort are finished, I dump the XFS
>> counters
>> >>> again. Accumulated over the entire cluster I get 3 TiB of writes and
>> 3.2 TiB
>> >>> of reads. What I'd have expected would be 2 TiB of writes (1 for
>> TeraGen, 1
>> >>> for TeraSort) and 1 TiB of reads (during TeraSort).
>> >>>
>> >>> Unsatisfied by the coarseness of these numbers I developed an HDFS
>> >>> wrapper that logs file system statistics for each call to hdfs://...,
>> such
>> >>> as start time/end time, no. of bytes read/written etc. I can plot
>>