Re: Question about configuring Rich Functions

2017-10-13 Thread Tony Wei
Hi Steve,

I think the discussion in this thread [1] could answer your questions.

Best Regards,
Tony Wei

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RichMapFunction-parameters-in-the-Streaming-API-td16121.html

Steve Jerman 於 2017年10月14日 週六,上午12:41寫道:

> This document:
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html#parsing-command-line-arguments-and-passing-them-around-in-your-flink-application
> Apache Flink 1.3 Documentation: Best Practices
> 
> ci.apache.org
> Application Development; Best Practices; Best Practices. This page
> contains a collection of best practices for Flink programmers on how to
> solve frequently ...
>
> describes the use of 'withParameters' to pass configuration into
> RichFunctions. Is this supported for streams?
>
>
> The .withParameters method doesn't seem to exist on stream operations ...
> map, window,filter 
>
>
> Is this dataset only? If so, might be worth clarifying in docs. If so, is
> there anyway to pass config via the open method in streams... seems not.
>
>
> Steve
>


Re: Flink 1.3.2 Netty Exception

2017-10-13 Thread Flavio Pompermaier
Any update on this? Do you want me to create a JIRA issue for this bug?

On 11 Oct 2017 17:14, "Ufuk Celebi"  wrote:

@Chesnay: Recycling of network resources happens after the tasks go
into state FINISHED. Since we are submitting new jobs in a local loop
here it can easily happen that the new job is submitted before enough
buffers are available again. At least, previously that was the case.

I'm CC'ing Nico who refactored the network buffer distribution
recently and who might have more details about this specific error
message.

@Nico: another question is why there seem to be more buffers available
but we don't assign them. I'm referring to this part of the error
message "5691 of 32768 bytes...".

On Wed, Oct 11, 2017 at 2:54 PM, Chesnay Schepler 
wrote:
> I can confirm that the issue is reproducible with the given test, from the
> command-line and IDE.
>
> While cutting down the test case, by replacing the outputformat with a
> DiscardingOutputFormat and the JDBCInputFormat with a simple collection, i
> stumbled onto a new Exception after ~200 iterations:
>
> org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.
>
> Caused by: java.io.IOException: Insufficient number of network buffers:
> required 4, but only 1 available. The total number of network buffers is
> currently set to 5691 of 32768 bytes each. You can increase this number by
> setting the configuration keys 'taskmanager.network.memory.fraction',
> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
>   at
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.
createBufferPool(NetworkBufferPool.java:195)
>   at
> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(
NetworkEnvironment.java:186)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:602)
>   at java.lang.Thread.run(Thread.java:745)
>
>
> On 11.10.2017 12:48, Flavio Pompermaier wrote:
>
> Hi to all,
> we wrote a small JUnit test to reproduce a memory issue we have in a Flink
> job (that seems related to Netty) . At some point, usually around the 28th
> loop, the job fails with the following exception (actually we never faced
> that in production but maybe is related to the memory issue somehow...):
>
> Caused by: java.lang.IllegalAccessError:
> org/apache/flink/runtime/io/network/netty/NettyMessage
> at
> io.netty.util.internal.__matchers__.org.apache.flink.
runtime.io.network.netty.NettyMessageMatcher.match(
NoOpTypeParameterMatcher.java)
> at
> io.netty.channel.SimpleChannelInboundHandler.acceptInboundMessage(
SimpleChannelInboundHandler.java:95)
> at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(
SimpleChannelInboundHandler.java:102)
> ... 16 more
>
> The github project is https://github.com/okkam-it/flink-memory-leak and
the
> JUnit test is contained in the MemoryLeakTest class (within
src/main/test).
>
> Thanks in advance for any support,
> Flavio
>
>


problem scale up Flink on YARN

2017-10-13 Thread Lei Chen
Hi,

We're trying to implement some module to help autoscale our pipeline which
is built  with Flink on YARN. According to the document, the suggested
procedure seems to be:

1. cancel job with savepoint
2. start new job with increased YARN TM number and parallelism.

However, step 2 always gave error

Caused by: java.lang.IllegalStateException: Failed to rollback to savepoint
hdfs://10.106.238.14:/tmp/savepoint-767421-20907d234655. Cannot map
savepoint state for operator 37dfe905df17858e07858039ce3d8ae1 to the new
program, because the operator is not available in the new program. If you
want to allow to skip this, you can set the --allowNonRestoredState option
on the CLI.
at
org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:130)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1140)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1386)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Also want to mention that the procedure worked fine if parallelism was not
changed.

The document does mentioned about manually OperatorID assignment, just
curious is that mandatory for my case to fix the problem I'm seeing, given
that my program doesn't change at all so the autogenerated operatorID
should be unchanged after parallelism increase?

thanks,
Lei


Question about configuring Rich Functions

2017-10-13 Thread Steve Jerman
This document:

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html#parsing-command-line-arguments-and-passing-them-around-in-your-flink-application

Apache Flink 1.3 Documentation: Best 
Practices
ci.apache.org
Application Development; Best Practices; Best Practices. This page contains a 
collection of best practices for Flink programmers on how to solve frequently 
...


describes the use of 'withParameters' to pass configuration into RichFunctions. 
Is this supported for streams?


The .withParameters method doesn't seem to exist on stream operations ... map, 
window,filter 


Is this dataset only? If so, might be worth clarifying in docs. If so, is there 
anyway to pass config via the open method in streams... seems not.


Steve


Re: Regression for dataStream.rescale method from 1.2.1 to 1.3.2

2017-10-13 Thread Till Rohrmann
Hi Antoine,

this looks like a regression to me. I'll investigate how this could happen
and let you know once I find something.

Cheers,
Till

On Fri, Oct 13, 2017 at 10:16 AM, Antoine Philippot <
antoine.philip...@teads.tv> wrote:

> Hi,
>
> After migrating our project from flink 1.2.1 to flink 1.3.2, we noticed a
> big performance drop due to a bad vertices balancing between task manager.
>
> In our use case, we set the default parallelism to the number of task
> managers :
>   val stream: DataStream[Array[Byte]] = env.addSource(new
> FlinkKafkaConsumer09[Array[Byte]]( ... )
>   .name("kafkaConsumer").rescale // 1 operator / instance
>
>   val parallelism = nbTaskManagers * nbTaskSlots
>   val hydratedStream: DataStream[Message] = stream
> .flatMap(avroDeserializer).name("AvroDeserializer").
> setParallelism(parallelism)
> .flatMap(messageParser).name("MessageParser").
> setParallelism(parallelism)
> .flatMap(messageHydration).name("Hydration").
> setParallelism(parallelism)
> .filter(MessageFilter).name("MessageFilter").
> setParallelism(parallelism)
>
>   hydratedStream.rescale // 1 operator / instance
> .addSink(kafkaSink).name("KafkaSink")
>
> If we take an example of 2 task managers with 4 slots by task manager
> with flink 1.2.1 we had for each instances :
> - 1 kafkaConsumer -> 4 mapOperators -> 1 kafkaSink
>
> But with exactly the same code with flink 1.3.2 the sinks are all located
> to one instance :
> first instance :
> - 1 kafkaConsumer -> 4 mapOperators -> 2 kafkaSink
> second instance :
> - 1 kafkaConsumer -> 4 mapOperators -> no kafkaSink (network transfert to
> the first task manager)
>
> This behaviour is the same with more task managers either in a local
> cluster or in a yarn cluster
>
> Is it a bug or should I update my code to have the same behaviour as flink
> 1.2.1 ?
>


Re: Timer coalescing necessary?

2017-10-13 Thread Aljoscha Krettek
Hi,

Because of how they are triggered by the watermark, all event-time triggers 
with the same timestamp will be triggered in the same go, without interleaving 
other calls. Same is true for processing-time triggers because they "piggy 
back" on the one "physical" processing-time service trigger.

Regarding how often the ProcessingTimeService fires: as often as needed. I.e. 
we have a bunch of timers for T = 100 and some timers for T = 900. Then we will 
have a processing-time service firing at 100 and one at 900.

Best,
Aljoscha

> On 13. Oct 2017, at 15:36, Kien Truong  wrote:
> 
> Hi,
> 
> Thanks for the explanation. 
> Because timer callback and normal execution are not guarantee to be 
> concurrent-safe, if we have multiple timers with the same timestamp, are all 
> of them run before the normal execution resume or are they interleaved with 
> normal execution?
> Also may I ask how often are the ProcessingTimeService fired ?
> 
> 
> Best regards,
> 
> Kien
> On 10/13/2017 7:48 PM, Aljoscha Krettek wrote:
>> Hi,
>> 
>> This is slightly different for processing-time and event-time triggers.
>> 
>> First, event-time triggers: there are two data structures, a PriorityQueue 
>> (which is implemented as a heap) of timers that is sorted by timestamp, a 
>> set of registered timers that is used for deduplication. When adding a 
>> timer, we first check whether it already exists (using the set) and then add 
>> it to the queue. Whenever we receive a watermark we poll from the timer 
>> queue as long as the timestamp of the top timer is <= the watermark. We 
>> remote the timer from the set and call the user callback.
>> 
>> For processing-time triggers it's very similar, except that we use a 
>> ProcessingTimeService instead of the watermark for advancing time. We always 
>> have one "physical" processing-time timer set at the ProcessingTimeService. 
>> When this fires we follow the same procedure as for event-time and then 
>> register a new "physical" timer for the next lowest processing-time timer.
>> 
>> In you case this would mean 3 separate internal timers, but a timer is only 
>> a timestamp and a key (and a namespace). 
>> 
>> Best,
>> Aljoscha
>> 
>> 
>>> On 13. Oct 2017, at 13:56, Kien Truong  
>>>  wrote:
>>> 
>>> Hi Aljoscha,
>>> 
>>> Could you clarify how the timer system works right now ?
>>> 
>>> For example, let's say I have a function F, with 3 keys that are registered 
>>> to execute at processing time T. 
>>> Would Flink maintain a single internal timer at time T, then run the 
>>> callback on all 3 keys when it's triggered ? Or there'd be 3 internal 
>>> timers that will be triggered separately at time T  ?
>>> 
>>> Best regards,
>>> 
>>> Kien
>>> On 10/13/2017 6:43 PM, Aljoscha Krettek wrote:
 Hi,
 
 If you have multiple timers per key, then coalescing can make sense to 
 reduce the burden on the timer system. Coalescing them across different 
 keys would not be possible right now.
 
 Best,
 Aljoscha
 
 
> On 13. Oct 2017, at 06:37, Kien Truong  
> 
>  wrote:
> 
> Hi,
> 
> We are having a streaming job where we use timers to implement key 
> timeout for stateful functions. Should we implement coalescing logic to 
> reduce the number of timer trigger, or it is not necessary with Flink?
> 
> Best regards,
> Kien
> 



Re: PartitionNotFoundException when running in yarn-session.

2017-10-13 Thread Niels Basjes
Hi

I did some tests and it turns out I was really overloading the cluster
which caused the problems.
I tried the timeout setting but that didn't help. Simply 'not overloading'
the system did help.

Thanks.

Niels


On Thu, Oct 12, 2017 at 10:42 AM, Ufuk Celebi  wrote:

> Hey Niels,
>
> Flink currently restarts the complete job if you have a restart
> strategy configured:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/restart_
> strategies.html.
>
> I agree that only restarting the required parts of the pipeline is an
> important optimization. Flink has not implemented this (fully) yet but
> it's on the agenda [1] and work has already started [2].
>
> In this particular case, everything is just slow and we don't need the
> restart at all if you give the consumer a higher max timeout.
>
> Please report back when you have more info :-)
>
> – Ufuk
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 1+%3A+Fine+Grained+Recovery+from+Task+Failures
>
> [2] https://issues.apache.org/jira/browse/FLINK-4256
>
> On Thu, Oct 12, 2017 at 10:17 AM, Niels Basjes  wrote:
> > Hi,
> >
> > I'm currently doing some tests to see it this info helps.
> > I was running a different high CPU task on one of the nodes outside
> Yarn, so
> > I took that one out of the cluster to see if that helps.
> >
> > What I do find strange that in this kind of error scenario the entire job
> > fails.
> > I would have expected something similar as with 'good old' MapReduce: The
> > missing task is simply resubmitted and ran again.
> > Why doesn't that happen?
> >
> >
> > Niels
> >
> > On Wed, Oct 11, 2017 at 8:49 AM, Ufuk Celebi  wrote:
> >>
> >> Hey Niels,
> >>
> >> any update on this?
> >>
> >> – Ufuk
> >>
> >>
> >> On Mon, Oct 9, 2017 at 10:16 PM, Ufuk Celebi  wrote:
> >> > Hey Niels,
> >> >
> >> > thanks for the detailed report. I don't think that it is related to
> >> > the Hadoop or Scala version. I think the following happens:
> >> >
> >> > - Occasionally, one of your tasks seems to be extremely slow in
> >> > registering its produced intermediate result (the data shuffled
> >> > between TaskManagers)
> >> > - Another task is already requesting to consume data from this task
> >> > but cannot find it (after multiple retries) and it fails the complete
> >> > job (your stack trace)
> >> >
> >> > That happens only occasionally probably due to load in your cluster.
> >> > The slow down could have multiple reasons...
> >> > - Is your Hadoop cluster resource constrained and the tasks are slow
> to
> >> > deploy?
> >> > - Is your application JAR very large and needs a lot of time
> >> > downloading?
> >> >
> >> > We have two options at this point:
> >> > 1) You can increase the maximum retries via the config option:
> >> > "taskmanager.network.request-backoff.max" The default is 1
> >> > (milliseconds) and specifies what the maximum request back off is [1].
> >> > Increasing this to 3 would give you two extra retries with pretty
> >> > long delays (see [1]).
> >> >
> >> > 2) To be sure that this is really what is happening we could increase
> >> > the log level of certain classes and check whether they have
> >> > registered their results or not. If you want to do this, I'm more than
> >> > happy to provide you with some classes to enable DEBUG logging for.
> >> >
> >> > What do you think?
> >> >
> >> > – Ufuk
> >> >
> >> > DETAILS
> >> > ===
> >> >
> >> > - The TaskManagers produce and consume intermediate results
> >> > - When a TaskManager wants to consume a result, it directly queries
> >> > the producing TaskManager for it
> >> > - An intermediate result becomes ready for consumption during initial
> >> > task setup (state DEPLOYING)
> >> > - When a TaskManager is slow to register its intermediate result and
> >> > the consumer requests the result before it is ready, it can happen
> >> > that a requested partition is "not found"
> >> >
> >> > This is what is also happening here. We retry to request the
> >> > intermediate result multiple times with timed backoff [1] and only
> >> > fail the request (your stack trace) if the partition is still not
> >> > ready although we expect it to be ready (that is there was no failure
> >> > at the producing task).
> >> >
> >> > [1] Starting by default at 100 millis and going up to 10_000 millis by
> >> > doubling that time (100, 200, 400, 800, 1600, 3200, 6400, 1)
> >> >
> >> >
> >> > On Mon, Oct 9, 2017 at 10:51 AM, Niels Basjes 
> wrote:
> >> >> Hi,
> >> >>
> >> >> I'm having some trouble running a java based Flink job in a
> >> >> yarn-session.
> >> >>
> >> >> The job itself consists of reading a set of files resulting in a
> >> >> DataStream
> >> >> (I use DataStream because in the future I intend to change the file
> >> >> with a
> >> >> Kafka feed), then does some parsing and eventually writes the data
> into
> >> >> HBase.
> >> >>
> >> >> Most of the time running this works fine yet sometimes it fails with
> >> >> this
> >> >> exception:
> >> >>
> >

Re: Timer coalescing necessary?

2017-10-13 Thread Kien Truong

Hi,

Thanks for the explanation.

Because timer callback and normal execution are not guarantee to be 
concurrent-safe, if we have multiple timers with the same timestamp, are 
all of them run before the normal execution resume or are they 
interleaved with normal execution?


Also may I ask how often are the ProcessingTimeService fired ?


Best regards,

Kien

On 10/13/2017 7:48 PM, Aljoscha Krettek wrote:

Hi,

This is slightly different for processing-time and event-time triggers.

First, event-time triggers: there are two data structures, a PriorityQueue (which 
is implemented as a heap) of timers that is sorted by timestamp, a set of 
registered timers that is used for deduplication. When adding a timer, we first 
check whether it already exists (using the set) and then add it to the queue. 
Whenever we receive a watermark we poll from the timer queue as long as the 
timestamp of the top timer is <= the watermark. We remote the timer from the 
set and call the user callback.

For processing-time triggers it's very similar, except that we use a ProcessingTimeService instead 
of the watermark for advancing time. We always have one "physical" processing-time timer 
set at the ProcessingTimeService. When this fires we follow the same procedure as for event-time 
and then register a new "physical" timer for the next lowest processing-time timer.

In you case this would mean 3 separate internal timers, but a timer is only a 
timestamp and a key (and a namespace).

Best,
Aljoscha



On 13. Oct 2017, at 13:56, Kien Truong  wrote:

Hi Aljoscha,

Could you clarify how the timer system works right now ?

For example, let's say I have a function F, with 3 keys that are registered to 
execute at processing time T.
Would Flink maintain a single internal timer at time T, then run the callback 
on all 3 keys when it's triggered ? Or there'd be 3 internal timers that will 
be triggered separately at time T  ?

Best regards,

Kien
On 10/13/2017 6:43 PM, Aljoscha Krettek wrote:

Hi,

If you have multiple timers per key, then coalescing can make sense to reduce 
the burden on the timer system. Coalescing them across different keys would not 
be possible right now.

Best,
Aljoscha



On 13. Oct 2017, at 06:37, Kien Truong 
  wrote:

Hi,

We are having a streaming job where we use timers to implement key timeout for 
stateful functions. Should we implement coalescing logic to reduce the number 
of timer trigger, or it is not necessary with Flink?

Best regards,
Kien



Re: Writing an Integration test for flink-metrics

2017-10-13 Thread Martin Eden
Hi,
Not merged in yet but this is an example pr that is mocking metrics and
checking they are properly updated:
https://github.com/apache/flink/pull/4725


On Fri, Oct 13, 2017 at 1:49 PM, Aljoscha Krettek 
wrote:

> I think we could add this functionality to the (operator) test harnesses.
> I.e. add a mock MetricGroup thingy in there that you can query to check the
> state of metrics.
>
>
> On 13. Oct 2017, at 13:50, Chesnay Schepler  wrote:
>
> I meant that you could unit-test the behavior of the function in
> isolation. You could create a dummy metric group that
> verifies that the correct counters are being registered (based on names i
> guess), as well as provide access to them.
> Mock some input and observe whether the counter value is being modified.
>
> Whether this is a viable option depends a bit on the complexity of the
> function of course, that is how much how mocking
> you would have to do.
>
> On 13.10.2017 11:18, Piotr Nowojski wrote:
>
> For testing Link applications in general you can read
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/
> testing.html
>
> However as we said before, testing metrics would require using custom or a
> imx reporter.
>
> Yes, please report this bug in Jira.
>
> Thanks, Piotrek
>
> On 13 Oct 2017, at 04:31, Colin Williams 
> wrote:
>
> Team wants an integration test, I'm not sure what unit test you had in
> mind. Actually feel that I've been trying to avoid the reporter method but
> that would be more end to end.
>
> The documentation for metrics and Scala are missing with the exception of
> Gauge: https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/monitoring/metrics.html . Should I file a issue against that?
>
> Then it leaves you guessing a little bit how to implement Counters. One
> approach tried was using objects
>
> object PointFilter extends RichMapFunction[...
>
>   @transient lazy val someCounter = 
> getRuntimeContext.getMetricGroup.counter(...)
>
>
> This allowed access to the counter before and after execution . However
> between the unit tests the Counter kept its value also and that's a no for
> the test. Think that might be an issue with ScalaTest.
>
> I've tried to get at the counter from some other directions like trying to
> find a way to inject a reporter to get it's state. But don't see a way to
> do it. So probably the best thing to do is fire up something to collect the
> metrics from the reporter.
>
> On Thu, Oct 12, 2017 at 5:29 AM, Chesnay Schepler 
> wrote:
>
>> Well damn, i should've read the second part of the initial mail.
>>
>> I'm wondering though, could you not unit-test this behavior?
>>
>>
>> On 12.10.2017 14:25, Chesnay Schepler wrote:
>>
>>> You could also write a custom reporter that opens a socket or similar
>>> for communication purposes.
>>>
>>> You can then either query it for the metrics, or even just trigger the
>>> verification in the reporter,
>>> and fail with an error if the reporter returns an error.
>>>
>>> On 12.10.2017 14:02, Piotr Nowojski wrote:
>>>
 Hi,

 Doing as you proposed using JMXReporter (or custom reporter) should
 work. I think there is no easier way to do this at the moment.

 Piotrek

 On 12 Oct 2017, at 04:58, Colin Williams  com> wrote:
>
> I have a RichMapFunction and I'd like to ensure Meter fields are
> properly incremented. I've been trying to think of the best way to do 
> this.
> Currently I think that I'd need to either implement my own reporter (or 
> use
> JMX) and write to a socket, create a listener and wait for the reporter to
> send the message.
>
> Is this a good approach for writing the test, or should I be
> considering something else?
>


>>>
>>>
>>
>
>
>
>


Re: Writing an Integration test for flink-metrics

2017-10-13 Thread Aljoscha Krettek
I think we could add this functionality to the (operator) test harnesses. I.e. 
add a mock MetricGroup thingy in there that you can query to check the state of 
metrics. 

> On 13. Oct 2017, at 13:50, Chesnay Schepler  wrote:
> 
> I meant that you could unit-test the behavior of the function in isolation. 
> You could create a dummy metric group that
> verifies that the correct counters are being registered (based on names i 
> guess), as well as provide access to them.
> Mock some input and observe whether the counter value is being modified.
> 
> Whether this is a viable option depends a bit on the complexity of the 
> function of course, that is how much how mocking
> you would have to do.
> 
> On 13.10.2017 11:18, Piotr Nowojski wrote:
>> For testing Link applications in general you can read 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html
>>  
>> 
>> 
>> However as we said before, testing metrics would require using custom or a 
>> imx reporter.
>> 
>> Yes, please report this bug in Jira. 
>> 
>> Thanks, Piotrek
>> 
>>> On 13 Oct 2017, at 04:31, Colin Williams >> > wrote:
>>> 
>>> Team wants an integration test, I'm not sure what unit test you had in 
>>> mind. Actually feel that I've been trying to avoid the reporter method but 
>>> that would be more end to end.
>>> 
>>> The documentation for metrics and Scala are missing with the exception of 
>>> Gauge: 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html
>>>  
>>> 
>>>  . Should I file a issue against that?
>>> 
>>> Then it leaves you guessing a little bit how to implement Counters. One 
>>> approach tried was using objects
>>> 
>>> object PointFilter extends RichMapFunction[...
>>>   @transient lazy val someCounter = 
>>> getRuntimeContext.getMetricGroup.counter(...)
>>> 
>>> This allowed access to the counter before and after execution . However 
>>> between the unit tests the Counter kept its value also and that's a no for 
>>> the test. Think that might be an issue with ScalaTest. 
>>> 
>>> I've tried to get at the counter from some other directions like trying to 
>>> find a way to inject a reporter to get it's state. But don't see a way to 
>>> do it. So probably the best thing to do is fire up something to collect the 
>>> metrics from the reporter.
>>> 
>>> On Thu, Oct 12, 2017 at 5:29 AM, Chesnay Schepler >> > wrote:
>>> Well damn, i should've read the second part of the initial mail.
>>> 
>>> I'm wondering though, could you not unit-test this behavior?
>>> 
>>> 
>>> On 12.10.2017 14:25, Chesnay Schepler wrote:
>>> You could also write a custom reporter that opens a socket or similar for 
>>> communication purposes.
>>> 
>>> You can then either query it for the metrics, or even just trigger the 
>>> verification in the reporter,
>>> and fail with an error if the reporter returns an error.
>>> 
>>> On 12.10.2017 14:02, Piotr Nowojski wrote:
>>> Hi,
>>> 
>>> Doing as you proposed using JMXReporter (or custom reporter) should work. I 
>>> think there is no easier way to do this at the moment.
>>> 
>>> Piotrek
>>> 
>>> On 12 Oct 2017, at 04:58, Colin Williams >> > wrote:
>>> 
>>> I have a RichMapFunction and I'd like to ensure Meter fields are properly 
>>> incremented. I've been trying to think of the best way to do this. 
>>> Currently I think that I'd need to either implement my own reporter (or use 
>>> JMX) and write to a socket, create a listener and wait for the reporter to 
>>> send the message.
>>> 
>>> Is this a good approach for writing the test, or should I be considering 
>>> something else?
>>> 
>>> 
>>> 
>>> 
>>> 
>> 
> 



Re: Timer coalescing necessary?

2017-10-13 Thread Aljoscha Krettek
Hi,

This is slightly different for processing-time and event-time triggers.

First, event-time triggers: there are two data structures, a PriorityQueue 
(which is implemented as a heap) of timers that is sorted by timestamp, a set 
of registered timers that is used for deduplication. When adding a timer, we 
first check whether it already exists (using the set) and then add it to the 
queue. Whenever we receive a watermark we poll from the timer queue as long as 
the timestamp of the top timer is <= the watermark. We remote the timer from 
the set and call the user callback.

For processing-time triggers it's very similar, except that we use a 
ProcessingTimeService instead of the watermark for advancing time. We always 
have one "physical" processing-time timer set at the ProcessingTimeService. 
When this fires we follow the same procedure as for event-time and then 
register a new "physical" timer for the next lowest processing-time timer.

In you case this would mean 3 separate internal timers, but a timer is only a 
timestamp and a key (and a namespace). 

Best,
Aljoscha


> On 13. Oct 2017, at 13:56, Kien Truong  wrote:
> 
> Hi Aljoscha,
> 
> Could you clarify how the timer system works right now ?
> 
> For example, let's say I have a function F, with 3 keys that are registered 
> to execute at processing time T. 
> Would Flink maintain a single internal timer at time T, then run the callback 
> on all 3 keys when it's triggered ? Or there'd be 3 internal timers that will 
> be triggered separately at time T  ?
> 
> Best regards,
> 
> Kien
> On 10/13/2017 6:43 PM, Aljoscha Krettek wrote:
>> Hi,
>> 
>> If you have multiple timers per key, then coalescing can make sense to 
>> reduce the burden on the timer system. Coalescing them across different keys 
>> would not be possible right now.
>> 
>> Best,
>> Aljoscha
>> 
>> 
>>> On 13. Oct 2017, at 06:37, Kien Truong 
>>>  wrote:
>>> 
>>> Hi,
>>> 
>>> We are having a streaming job where we use timers to implement key timeout 
>>> for stateful functions. Should we implement coalescing logic to reduce the 
>>> number of timer trigger, or it is not necessary with Flink?
>>> 
>>> Best regards,
>>> Kien
>>> 



Re: Submitting a job via command line

2017-10-13 Thread Piotr Nowojski
Good to hear that :)


> On 13 Oct 2017, at 14:40, Alexander Smirnov  wrote:
> 
> Thank you so much, it helped!
> 
> From: Piotr Nowojski  >
> Date: Thursday, October 12, 2017 at 6:00 PM
> To: Alexander Smirnov mailto:asmir...@five9.com>>
> Cc: "user@flink.apache.org " 
> mailto:user@flink.apache.org>>
> Subject: Re: Submitting a job via command line
> 
> Have you tried this 
> http://mail-archives.apache.org/mod_mbox/flink-user/201705.mbox/%3ccagr9p8bxhljseexwzvxlk+drotyp1yxjy4n4_qgerdzxz8u...@mail.gmail.com%3E
>  
> 
> ?
> 
> Piotrek
> 
>> On 12 Oct 2017, at 16:30, Alexander Smirnov > > wrote:
>> 
>> Hello All,
>>  
>> I got the following error while attempting to execute a job via command line:
>> 
>> [root@flink01 bin]# ./flink run -c com.five9.stream.PrecomputeJob 
>> /vagrant/flink-precompute-1.0-SNAPSHOT.jar -Xmx2048m -Xms2048m
>> Cluster configuration: Standalone cluster with JobManager at 
>> flink01.pb.lx-draskin5.five9.com/10.11.132.110:6123 
>> 
>> Using address flink01.pb.lx-draskin5.five9.com:6123 
>>  to connect to JobManager.
>> JobManager web interface address 
>> http://flink01.pb.lx-draskin5.five9.com:8081 
>> 
>> Starting execution of program
>> Submitting job with JobID: 222a9d44d2069ab3cc41866c8f3a. Waiting for job 
>> completion.
>> Connected to JobManager at 
>> Actor[akka.tcp://fl...@flink01.pb.lx-draskin5.five9.com 
>> :6123/user/jobmanager#-1899708478]
>>  with leader session id ----.
>>  
>> 
>> The program finished with the following exception:
>>  
>> org.apache.flink.client.program.ProgramInvocationException: The program 
>> execution failed: Couldn't retrieve the JobExecutionResult from the 
>> JobManager.
>> at 
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
>> at 
>> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
>> at 
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
>> at 
>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:73)
>> at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1499)
>> at com.five9.stream.PrecomputeJob.execute(PrecomputeJob.java:137)
>> at 
>> com.five9.stream.PrecomputeJob.configureAndExecute(PrecomputeJob.java:78)
>> at com.five9.stream.PrecomputeJob.main(PrecomputeJob.java:65)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>> at 
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
>> at 
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:381)
>> at 
>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)
>> at 
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)
>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
>> at 
>> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at 
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
>> at 
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1130)
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't 
>> retrieve the JobExecutionResult from the JobManager.
>> at 
>> org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309)
>> at 
>> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
>> at 
>> org.apache.flink.client.prog

Re: Submitting a job via command line

2017-10-13 Thread Alexander Smirnov
Thank you so much, it helped!

From: Piotr Nowojski mailto:pi...@data-artisans.com>>
Date: Thursday, October 12, 2017 at 6:00 PM
To: Alexander Smirnov mailto:asmir...@five9.com>>
Cc: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: Submitting a job via command line

Have you tried this
http://mail-archives.apache.org/mod_mbox/flink-user/201705.mbox/%3ccagr9p8bxhljseexwzvxlk+drotyp1yxjy4n4_qgerdzxz8u...@mail.gmail.com%3E
?

Piotrek

On 12 Oct 2017, at 16:30, Alexander Smirnov 
mailto:asmir...@five9.com>> wrote:

Hello All,

I got the following error while attempting to execute a job via command line:

[root@flink01 bin]# ./flink run -c com.five9.stream.PrecomputeJob 
/vagrant/flink-precompute-1.0-SNAPSHOT.jar -Xmx2048m -Xms2048m
Cluster configuration: Standalone cluster with JobManager at 
flink01.pb.lx-draskin5.five9.com/10.11.132.110:6123
Using address 
flink01.pb.lx-draskin5.five9.com:6123
 to connect to JobManager.
JobManager web interface address 
http://flink01.pb.lx-draskin5.five9.com:8081
Starting execution of program
Submitting job with JobID: 222a9d44d2069ab3cc41866c8f3a. Waiting for job 
completion.
Connected to JobManager at 
Actor[akka.tcp://fl...@flink01.pb.lx-draskin5.five9.com:6123/user/jobmanager#-1899708478]
 with leader session id ----.


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:73)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1499)
at com.five9.stream.PrecomputeJob.execute(PrecomputeJob.java:137)
at 
com.five9.stream.PrecomputeJob.configureAndExecute(PrecomputeJob.java:78)
at com.five9.stream.PrecomputeJob.main(PrecomputeJob.java:65)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:381)
at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
at 
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1130)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't 
retrieve the JobExecutionResult from the JobManager.
at 
org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309)
at 
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467)
... 25 more
Caused by: 
org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job 
submission to the JobManager timed out. You may increase 'akka.client.timeout' 
in case the JobManager needs more time to configure and confirm the job 
submission.
at 
org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)

Re: Timer coalescing necessary?

2017-10-13 Thread Kien Truong

Hi Aljoscha,

Could you clarify how the timer system works right now ?

For example, let's say I have a function F, with 3 keys that are 
registered to execute at processing time T.


Would Flink maintain a single internal timer at time T, then run the 
callback on all 3 keys when it's triggered ? Or there'd be 3 internal 
timers that will be triggered separately at time T  ?



Best regards,

Kien

On 10/13/2017 6:43 PM, Aljoscha Krettek wrote:

Hi,

If you have multiple timers per key, then coalescing can make sense to reduce 
the burden on the timer system. Coalescing them across different keys would not 
be possible right now.

Best,
Aljoscha


On 13. Oct 2017, at 06:37, Kien Truong  wrote:

Hi,

We are having a streaming job where we use timers to implement key timeout for 
stateful functions. Should we implement coalescing logic to reduce the number 
of timer trigger, or it is not necessary with Flink?

Best regards,
Kien


Re: Writing an Integration test for flink-metrics

2017-10-13 Thread Chesnay Schepler
I meant that you could unit-test the behavior of the function in 
isolation. You could create a dummy metric group that
verifies that the correct counters are being registered (based on names 
i guess), as well as provide access to them.

Mock some input and observe whether the counter value is being modified.

Whether this is a viable option depends a bit on the complexity of the 
function of course, that is how much how mocking

you would have to do.

On 13.10.2017 11:18, Piotr Nowojski wrote:
For testing Link applications in general you can read 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html


However as we said before, testing metrics would require using custom 
or a imx reporter.


Yes, please report this bug in Jira.

Thanks, Piotrek

On 13 Oct 2017, at 04:31, Colin Williams 
> wrote:


Team wants an integration test, I'm not sure what unit test you had 
in mind. Actually feel that I've been trying to avoid the reporter 
method but that would be more end to end.


The documentation for metrics and Scala are missing with the 
exception of Gauge: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html 
. Should I file a issue against that?


Then it leaves you guessing a little bit how to implement Counters. 
One approach tried was using objects


object PointFilterextends RichMapFunction[...
   @transient lazy val someCounter = 
getRuntimeContext.getMetricGroup.counter(...)

This allowed access to the counter before and after execution . 
However between the unit tests the Counter kept its value also and 
that's a no for the test. Think that might be an issue with ScalaTest.


I've tried to get at the counter from some other directions like 
trying to find a way to inject a reporter to get it's state. But 
don't see a way to do it. So probably the best thing to do is fire up 
something to collect the metrics from the reporter.


On Thu, Oct 12, 2017 at 5:29 AM, Chesnay Schepler > wrote:


Well damn, i should've read the second part of the initial mail.

I'm wondering though, could you not unit-test this behavior?


On 12.10.2017 14:25, Chesnay Schepler wrote:

You could also write a custom reporter that opens a socket or
similar for communication purposes.

You can then either query it for the metrics, or even just
trigger the verification in the reporter,
and fail with an error if the reporter returns an error.

On 12.10.2017 14:02, Piotr Nowojski wrote:

Hi,

Doing as you proposed using JMXReporter (or custom
reporter) should work. I think there is no easier way to
do this at the moment.

Piotrek

On 12 Oct 2017, at 04:58, Colin Williams
mailto:colin.williams.seat...@gmail.com>> wrote:

I have a RichMapFunction and I'd like to ensure Meter
fields are properly incremented. I've been trying to
think of the best way to do this. Currently I think
that I'd need to either implement my own reporter (or
use JMX) and write to a socket, create a listener and
wait for the reporter to send the message.

Is this a good approach for writing the test, or
should I be considering something else?












Re: Timer coalescing necessary?

2017-10-13 Thread Aljoscha Krettek
Hi,

If you have multiple timers per key, then coalescing can make sense to reduce 
the burden on the timer system. Coalescing them across different keys would not 
be possible right now.

Best,
Aljoscha

> On 13. Oct 2017, at 06:37, Kien Truong  wrote:
> 
> Hi,
> 
> We are having a streaming job where we use timers to implement key timeout 
> for stateful functions. Should we implement coalescing logic to reduce the 
> number of timer trigger, or it is not necessary with Flink?
> 
> Best regards,
> Kien



Re: Beam Application run on cluster setup (Kafka+Flink)

2017-10-13 Thread Shankara
Hi Piotrek,

I was checking in Job manager machine logs, and dashboard. But actually
output string was recorded in taskmanager macine log file. I added InfluxDB
and verified, Received data is writing into influxDB.

   Thank you very much for your support.


Thanks,
Shankara



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Fwd: Decouple Kafka partitions and Flink parallelism for ordered streams

2017-10-13 Thread Sanne de Roever
Hi Chesnay,

/** Fowarding this to group, I mistakingly replied to you directly
previously, apologies */

The side output option works in combination with setting slot sharing
groups. For reference I have included a source file. The job takes three
slots. One slot for input handling, and one slot for two maps each. In this
setup the messages are still processed in order per sub-stream.

In the web client the slots are not really visible, except for the used
slots count. A possible feature could be to color the tasks according to
the slot groups they are on.

This opens up new possibilities, and more importantly decouples Kafka
configuration from Flink configuration. Thank you very much for your input!

Cheers,

Sanne

package org.example

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

/**
  * This example shows an implementation of WordCount with data from a
text socket.
  * To run the example make sure that the service providing the text
data is already up and running.
  *
  * To start an example socket text stream on your local machine run
netcat from a command line,
  * where the parameter specifies the port number:
  *
  * {{{
  *   nc -lk 
  * }}}
  *
  * Usage:
  * {{{
  *   SocketTextStreamWordCount   
  * }}}
  *
  * This example shows how to:
  *
  *   - use StreamExecutionEnvironment.socketTextStream
  *   - write a simple Flink Streaming program in scala.
  *   - write and use user-defined functions.
  */
object SocketTextStreamWordCount {

  def main(args: Array[String]) {
if (args.length != 2) {
  System.err.println("USAGE:\nSocketTextStreamWordCount  ")
  return
}

val hostName = args(0)
val port = args(1).toInt
val outputTag1 = OutputTag[String]("side-1")
val outputTag2 = OutputTag[String]("side-2")

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.enableObjectReuse()

//Create streams for names and ages by mapping the inputs to the
corresponding objects
val text = env.socketTextStream(hostName,
port).slotSharingGroup("processElement")
val counts = text.flatMap {
  _.toLowerCase.split("\\W+") filter {
_.nonEmpty
  }
}
  .process(new ProcessFunction[String, String] {
override def processElement(
 value: String,
 ctx: ProcessFunction[String,
String]#Context,
 out: Collector[String]): Unit = {
  if (value.head <= 'm') ctx.output(outputTag1, String.valueOf(value))
  else ctx.output(outputTag2, String.valueOf(value))
}
  })

val sideOutputStream1: DataStream[String] = counts.getSideOutput(outputTag1)
val sideOutputStream2: DataStream[String] = counts.getSideOutput(outputTag2)

val output1 = sideOutputStream1.map {
  (_, 1)
}.slotSharingGroup("map1")
  .keyBy(0)
  .sum(1)

val output2 = sideOutputStream2.map {
  (_, 1)
}.slotSharingGroup("map2")
  .keyBy(0)
  .sum(1)

output1.print()
output2.print()

env.execute("Scala SocketTextStreamWordCount Example")
  }

}



On Thu, Oct 12, 2017 at 12:09 PM, Sanne de Roever  wrote:

> Hi Chesnay,
>
> Thanks for confirming the challenge and putting in the time to help out;
> enlightening.
>
> I wasn't aware of the Async I/O API in Flink, that looks promising; there
> might be some deployment questions to balance the number of slots vs the
> concurrency on machines.
>
> The side-output is also promising: the concurrency remains transparant
> deploymenty wise.
>
> Cheers,
>
> Sanne
>
> On Wed, Oct 11, 2017 at 5:36 PM, Chesnay Schepler 
> wrote:
>
>> I couldn't find a proper solution for this. The easiest solution might be
>> to use the Async I/O
>> ,
>> and do the validation
>> with an ExecutionService or similar in the map function.
>>
>> I've CC'd aljoscha, maybe he has another idea.
>>
>> The local partitioning solution is, theoretically, not impossible to do,
>> but i

Re: Writing an Integration test for flink-metrics

2017-10-13 Thread Piotr Nowojski
For testing Link applications in general you can read 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html
 


However as we said before, testing metrics would require using custom or a imx 
reporter.

Yes, please report this bug in Jira. 

Thanks, Piotrek

> On 13 Oct 2017, at 04:31, Colin Williams  
> wrote:
> 
> Team wants an integration test, I'm not sure what unit test you had in mind. 
> Actually feel that I've been trying to avoid the reporter method but that 
> would be more end to end.
> 
> The documentation for metrics and Scala are missing with the exception of 
> Gauge: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html
>  
> 
>  . Should I file a issue against that?
> 
> Then it leaves you guessing a little bit how to implement Counters. One 
> approach tried was using objects
> 
> object PointFilter extends RichMapFunction[...
>   @transient lazy val someCounter = 
> getRuntimeContext.getMetricGroup.counter(...)
> 
> This allowed access to the counter before and after execution . However 
> between the unit tests the Counter kept its value also and that's a no for 
> the test. Think that might be an issue with ScalaTest. 
> 
> I've tried to get at the counter from some other directions like trying to 
> find a way to inject a reporter to get it's state. But don't see a way to do 
> it. So probably the best thing to do is fire up something to collect the 
> metrics from the reporter.
> 
> On Thu, Oct 12, 2017 at 5:29 AM, Chesnay Schepler  > wrote:
> Well damn, i should've read the second part of the initial mail.
> 
> I'm wondering though, could you not unit-test this behavior?
> 
> 
> On 12.10.2017 14:25, Chesnay Schepler wrote:
> You could also write a custom reporter that opens a socket or similar for 
> communication purposes.
> 
> You can then either query it for the metrics, or even just trigger the 
> verification in the reporter,
> and fail with an error if the reporter returns an error.
> 
> On 12.10.2017 14:02, Piotr Nowojski wrote:
> Hi,
> 
> Doing as you proposed using JMXReporter (or custom reporter) should work. I 
> think there is no easier way to do this at the moment.
> 
> Piotrek
> 
> On 12 Oct 2017, at 04:58, Colin Williams  > wrote:
> 
> I have a RichMapFunction and I'd like to ensure Meter fields are properly 
> incremented. I've been trying to think of the best way to do this. Currently 
> I think that I'd need to either implement my own reporter (or use JMX) and 
> write to a socket, create a listener and wait for the reporter to send the 
> message.
> 
> Is this a good approach for writing the test, or should I be considering 
> something else?
> 
> 
> 
> 
> 



Re: Beam Application run on cluster setup (Kafka+Flink)

2017-10-13 Thread Piotr Nowojski
Hi,

What version of Flink are you using. In earlier 1.3.x releases there were some 
bugs in Kafka Consumer code.
Could you change the log level in Flink to debug? 
Did you check the Kafka logs for some hint maybe?
I guess that metrics like bytes read/input records of this Link application are 
not changing?

Piotrek

> On 13 Oct 2017, at 07:51, Shankara  wrote:
> 
> Hi,
> 
>I mean same code works fine in flink local setup. I can able to see
> "Received Message  from testkafka Topic : " on console when kafka
> receive some message (Kafka Producer is in other machine and sending some
> message frequently to testkafka topic).
> 
> *Submitted the Beam application to flink local by below command :*
> mvn compile exec:java
> -Dexec.mainClass=org.apache.beam.influxdb.KafkaRead  -Pflink-runner
> 
> *Output is :*
> Connected to JobManager at Actor[akka://flink/user/jobmanager_1#735957608]
> with leader session id d97c060d-bdf9-4215-8d7c-138f13cbff1e.
> 10/13/2017 11:09:09   Job execution switched to status RUNNING.
> 10/13/2017 11:09:09   Source: Read(UnboundedKafkaSource) -> Flat Map ->
> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to SCHEDULED 
> 10/13/2017 11:09:09   Source: Read(UnboundedKafkaSource) -> Flat Map ->
> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to DEPLOYING 
> 10/13/2017 11:09:09   Source: Read(UnboundedKafkaSource) -> Flat Map ->
> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to RUNNING 
> *Received in Deserilize..
> Received Message  from testkafka Topic : HELLOASA*
> 
> 
> 
>If I run same code in Flink Cluster I cannot see any message in
> log/stdout, But job is continuously running and Kafka Producer is in other
> machine and sending some message frequently to testkafka topic.
> 
>  * I started flink cluster by below command : *
>   bin/start-cluster.sh
> 
>   *Submitted the Beam application to flink cluster by below command :*
>  bin/flink run -c org.apache.beam.influxdb.KafkaRead
> /home/root1/NAI/US_Working/NAI_KPI/Kafka_Proto_Sub/target/kafka-proto-sub-bundled-2.1.0.jar
> --runner=FlinkRunner --flinkMaster=192.168.1.116
> --filesToStage=/home/root1/NAI/US_Working/NAI_KPI/Kafka_Proto_Sub/target/kafka-proto-sub-bundled-2.1.0.jar
> 
> 
>   In dashboad :
> 
> 
> 
>  
> 
> 
> 
>I cannot see any message in dashboard :
> 
> 
> 
>  
> 
> 
>   As per log Job execution is running :
> Cluster configuration: Standalone cluster with JobManager at
> /192.168.1.116:6123
> Using address 192.168.1.116:6123 to connect to JobManager.
> JobManager web interface address http://192.168.1.116:8081
> Starting execution of program
> Submitting job with JobID: 8d731f801d00268f951a98d093f21e0c. Waiting for job
> completion.
> Connected to JobManager at
> Actor[akka.tcp://flink@192.168.1.116:6123/user/jobmanager#422012792] with
> leader session id ----.
> 10/13/2017 11:10:57   Job execution switched to status RUNNING.
> 10/13/2017 11:10:57   Source: Read(UnboundedKafkaSource) -> Flat Map ->
> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to SCHEDULED 
> 10/13/2017 11:10:57   Source: Read(UnboundedKafkaSource) -> Flat Map ->
> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to DEPLOYING 
> 10/13/2017 11:11:05   Source: Read(UnboundedKafkaSource) -> Flat Map ->
> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to RUNNING 
> 
>   There is no exception in log. I suspect deployment of kafka having issue. 
> 
> Can you please help me to check it.
> 
> 
> 
> 
> public static void main(String[] args) { 
>Pipeline p = initializePipeline(args); 
>Map> intelliOmIms = new TreeMap<>(); 
> 
>PTransform>>
> reader; 
>reader = KafkaIO.read() 
>   .withBootstrapServers("192.168.1.116:9092")--->Kafka 
> zookeeper and server running 
>.withTopic("kafkatest") 
>.withKeyDeserializer(IntegerDeserializer.class) 
>.withValueDeserializer(IntelliOmImsKpiDataUtil.class) 
>.withoutMetadata(); 
> 
>PCollection> output = p.apply(reader); 
>output.apply(ParDo.of(new PrintMsg())); 
> 
>p.run().waitUntilFinish(); 
> } 
> 
> public static class PrintMsg extends DoFn, Void> {
> 
>@ProcessElement
>public void processElement(ProcessContext c) {
> 
>try {
>System.out.println("Received Message  from testkafka
> Topic : " + new String(c.element().getValue(), "UTF-8"));
>} catch (UnsupportedEncodingException e) {
>e.printStackTrace();
>}
>}
>}
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Regression for dataStream.rescale method from 1.2.1 to 1.3.2

2017-10-13 Thread Antoine Philippot
Hi,

After migrating our project from flink 1.2.1 to flink 1.3.2, we noticed a
big performance drop due to a bad vertices balancing between task manager.

In our use case, we set the default parallelism to the number of task
managers :
  val stream: DataStream[Array[Byte]] = env.addSource(new
FlinkKafkaConsumer09[Array[Byte]]( ... )
  .name("kafkaConsumer").rescale // 1 operator / instance

  val parallelism = nbTaskManagers * nbTaskSlots
  val hydratedStream: DataStream[Message] = stream

.flatMap(avroDeserializer).name("AvroDeserializer").setParallelism(parallelism)

.flatMap(messageParser).name("MessageParser").setParallelism(parallelism)
.flatMap(messageHydration).name("Hydration").setParallelism(parallelism)
.filter(MessageFilter).name("MessageFilter").setParallelism(parallelism)

  hydratedStream.rescale // 1 operator / instance
.addSink(kafkaSink).name("KafkaSink")

If we take an example of 2 task managers with 4 slots by task manager
with flink 1.2.1 we had for each instances :
- 1 kafkaConsumer -> 4 mapOperators -> 1 kafkaSink

But with exactly the same code with flink 1.3.2 the sinks are all located
to one instance :
first instance :
- 1 kafkaConsumer -> 4 mapOperators -> 2 kafkaSink
second instance :
- 1 kafkaConsumer -> 4 mapOperators -> no kafkaSink (network transfert to
the first task manager)

This behaviour is the same with more task managers either in a local
cluster or in a yarn cluster

Is it a bug or should I update my code to have the same behaviour as flink
1.2.1 ?