Re: In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed

2016-12-05 Thread Aljoscha Krettek
Hi,
could you please try adding this custom watermark debugger to see what's
going on with the element timestamps and watermarks:

public static class WatermarkDebugger
extends AbstractStreamOperator implements
OneInputStreamOperator {
private static final long serialVersionUID = 1L;

@Override
public void processElement(StreamRecord element) throws Exception {
System.out.println("ELEMENT: " + element);
output.collect(element);
}

@Override
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
System.out.println("WM: " + mark);
}
}

you can use it like this:
input.transform("WatermarkDebugger", input.getType(), new
WatermarkDebugger>());

That should give us something to work with.

Cheers,
Aljoscha

On Mon, 5 Dec 2016 at 18:54 Robert Metzger  wrote:

I'll add Aljoscha and Kostas Kloudas to the conversation. They have the
best overview over the changes to the window operator between 1.1. and 1.2.

On Mon, Dec 5, 2016 at 11:33 AM, Yassine MARZOUGUI <
y.marzou...@mindlytix.com> wrote:

I forgot to mention : the watermark extractor is the one included in Flink
API.

2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI :

Hi robert,

Yes, I am using the same code, just swithcing the version in pom.xml to
1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at
the time of the question)). Here is the watermark assignment :

.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor>() {
@Override
public long extractAscendingTimestamp(Tuple3
tuple3) {
return tuple3.f0;
}
})

Best,
Yassine

2016-12-05 11:24 GMT+01:00 Robert Metzger :

Hi Yassine,
are you sure your watermark extractor is the same between the two versions.
It sounds a bit like the watermarks for the 1.2 code are not generated
correctly.

Regards,
Robert


On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI  wrote:

Hi all,

With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows
boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing
in memory and the windows results are not emitted until the whole stream is
processed. Is this a temporary behaviour due to the developments in
1.2-SNAPSHOT, or a bug?

I am using a code similar to the follwoing:

env.setParallelism(1);

DataStream sessions = env
.readTextFile()
.flatMap()
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
.keyBy(1)
.window(EventTimeSessionWindows.withGap(Time.minutes(5)))
.apply().setParallelism(32)

sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();

Best,
Yassine


Re: separation of JVMs for different applications

2016-12-05 Thread Manu Zhang
Thanks Stephan,

They don't use YARN now but I think they will consider it.  Do you think it
would be beneficial to provide such an option as "separate-jvm" in
stand-alone mode for streaming processor and long running services ? Or do
you think it would introduce too much complexity ?

Manu

On Tue, Dec 6, 2016 at 1:04 AM Stephan Ewen  wrote:

> Hi!
>
> Are your customers using YARN? In that case, the default configuration
> will start a new YARN application per Flink job, no JVMs are shared between
> jobs. By default, even each slot has its own JVM.
>
> Greetings,
> Stephan
>
> PS: I think the "spawning new JVMs" is what Till referred to when saying
> "spinning up a new cluster". Keep in mind that Flink is also a batch
> processor, and it handles sequences of short batch jobs (as issued for
> example by interactive shells) and it pre-allocates and manages a lot of
> memory for batch jobs.
>
>
>
> On Mon, Dec 5, 2016 at 3:48 PM, Manu Zhang 
> wrote:
>
> The pro for the multi-tenant cluster mode is that you can share data
> between jobs and you don't have to spin up a new cluster for each job.
>
>
> I don't think we have to spin up a new cluster for each job if every job
> gets its own JVMs. For examples, Storm will launch a new worker(JVM) for a
> new job when free slots are available. How can we share data between jobs
> and why ?
>
>
>
> On Mon, Dec 5, 2016 at 6:27 PM, Till Rohrmann 
> wrote:
>
> The pro for the multi-tenant cluster mode is that you can share data
> between jobs and you don't have to spin up a new cluster for each job. This
> might be helpful for scenarios where you want to run many short-lived and
> light-weight jobs.
>
> But the important part is that you don't have to use this method. You can
> also start a new Flink cluster per job which will then execute the job
> isolated from any other jobs (given that you don't submit other jobs to
> this cluster).
>
> Cheers,
> Till
>
> On Sat, Dec 3, 2016 at 2:50 PM, Manu Zhang 
> wrote:
>
> Thanks Fabian and Till.
>
> We have customers who are interested in using Flink but very concerned
> about that "multiple jobs share the same set of TMs". I've just joined the
> community recently so I'm not sure whether there has been a discussion over
> the "multi-tenant cluster mode" before.
>
> The cons are one job/user's failure may crash another, which is
> unacceptable in a multi-tenant scenario.
> What are the pros ? Do the pros overweigh the cons ?
>
> Manu
>
> On Fri, Dec 2, 2016 at 7:06 PM Till Rohrmann  wrote:
>
> Hi Manu,
>
> with Flip-6 we will be able to support stricter application isolation by
> starting for each job a dedicated JobManager which will execute its tasks
> on TM reserved solely for this job. But at the same time we will continue
> supporting the multi-tenant cluster mode where tasks belonging to multiple
> jobs share the same set of TMs and, thus, might share information between
> them.
>
> Cheers,
> Till
>
> On Fri, Dec 2, 2016 at 11:19 AM, Fabian Hueske  wrote:
>
> Hi Manu,
>
> As far as I know, there are not plans to change the stand-alone deployment.
> FLIP-6 is focusing on deployments via resource providers (YARN, Mesos,
> etc.) which allow to start Flink processes per job.
>
> Till (in CC) is more familiar with the FLIP-6 effort and might be able to
> add more detail.
>
> Best,
> Fabian
>
> 2016-12-01 4:16 GMT+01:00 Manu Zhang :
>
> Hi all,
>
> It seems tasks of different Flink applications can end up in the same JVM
> (TaskManager) in standalone mode. Isn't this fragile since errors in one
> application could crash another ? I checked FLIP-6
>  
> but
> didn't found any mention of changing it in the future.
>
> Any thoughts or have I missed anything ?
>
> Thanks,
> Manu Zhang
>
>
>
>
>
>
>


Fwd: Default restart behavior with checkpointing

2016-12-05 Thread Rohit Agarwal
Hi,

https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/fault_tolerance.html
says:

Number of retries: The setNumberOfExecutionRerties() method defines how
many times the job is restarted after a failure. When checkpointing is
activated, but this value is not explicitly set, the job is restarted
infinitely often.

It also says:
The default restart strategy is set via Flink’s configuration file
flink-conf.yaml. The configuration parameter restart-strategy defines which
strategy is taken. Per default, the no-restart strategy is used.

What is the default restart behavior when checkpointing is used?

--
Rohit Agarwal


Re: Resource under-utilization when using RocksDb state backend

2016-12-05 Thread Robert Metzger
Another Flink user using RocksDB with large state on SSDs recently posted
this video for oprimizing the performance of Rocks on SSDs:
https://www.youtube.com/watch?v=pvUqbIeoPzM
That could be relevant for you.

For how long did you look at iotop. It could be that the IO access happens
in bursts, depending on how data is cached.

I'll also add Stefan Richter to the conversation, he has maybe some more
ideas what we can do here.


On Mon, Dec 5, 2016 at 6:19 PM, Cliff Resnick  wrote:

> Hi Robert,
>
> We're following 1.2-SNAPSHOT,  using event time. I have tried "iotop" and
> I see usually less than 1 % IO. The most I've seen was a quick flash here
> or there of something substantial (e.g. 19%, 52%) then back to nothing. I
> also assumed we were disk-bound, but to use your metaphor I'm having
> trouble finding any smoke. However, I'm not very experienced in sussing out
> IO issues so perhaps there is something else I'm missing.
>
> I'll keep investigating. If I continue to come up empty then I guess my
> next steps may be to stage some independent tests directly against RocksDb.
>
> -Cliff
>
>
> On Mon, Dec 5, 2016 at 5:52 AM, Robert Metzger 
> wrote:
>
>> Hi Cliff,
>>
>> which Flink version are you using?
>> Are you using Eventtime or processing time windows?
>>
>> I suspect that your disks are "burning" (= your job is IO bound). Can you
>> check with a tool like "iotop" how much disk IO Flink is producing?
>> Then, I would set this number in relation with the theoretical maximum of
>> your SSD's (a good rough estimate is to use dd for that).
>>
>> If you find that your disk bandwidth is saturated by Flink, you could
>> look into tuning the RocksDB settings so that it uses more memory for
>> caching.
>>
>> Regards,
>> Robert
>>
>>
>> On Fri, Dec 2, 2016 at 11:34 PM, Cliff Resnick  wrote:
>>
>>> In tests comparing RocksDb to fs state backend we observe much lower
>>> throughput, around 10x slower. While the lowered throughput is expected,
>>> what's perplexing is that machine load is also very low with RocksDb,
>>> typically falling to  < 25% CPU and negligible IO wait (around 0.1%). Our
>>> test instances are EC2 c3.xlarge which are 4 virtual CPUs and 7.5G RAM,
>>> each running a single TaskManager in YARN, with 6.5G allocated memory per
>>> TaskManager. The instances also have 2x40G attached SSDs which we have
>>> mapped to `taskmanager.tmp.dir`.
>>>
>>> With FS state and 4 slots per TM, we will easily max out with an average
>>> load average around 5 or 6, so we actually need throttle down the slots to
>>> 3. With RocksDb using the Flink SSD configured options we see a load
>>> average at around 1. Also, load (and actual) throughput remain more or less
>>> constant no matter how many slots we use. The weak load is spread over all
>>> CPUs.
>>>
>>> Here is a sample top:
>>>
>>> Cpu0  : 20.5%us,  0.0%sy,  0.0%ni, 79.5%id,  0.0%wa,  0.0%hi,  0.0%si,
>>>  0.0%st
>>> Cpu1  : 18.5%us,  0.0%sy,  0.0%ni, 81.5%id,  0.0%wa,  0.0%hi,  0.0%si,
>>>  0.0%st
>>> Cpu2  : 11.6%us,  0.7%sy,  0.0%ni, 87.0%id,  0.7%wa,  0.0%hi,  0.0%si,
>>>  0.0%st
>>> Cpu3  : 12.5%us,  0.3%sy,  0.0%ni, 86.8%id,  0.0%wa,  0.0%hi,  0.3%si,
>>>  0.0%st
>>>
>>> Our pipeline uses tumbling windows, each with a ValueState keyed to a
>>> 3-tuple of one string and two ints.. Each ValueState comprises a small set
>>> of tuples around 5-7 fields each. The WindowFunction simply diffs agains
>>> the set and updates state if there is a diff.
>>>
>>> Any ideas as to what the bottleneck is here? Any suggestions welcomed!
>>>
>>> -Cliff
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>


RE: Equivalent of Rx combineLatest() on a join?

2016-12-05 Thread denis.dollfus
Actually that doesn't work as expected because emitted values are not purged. 
I'll experiment with purging triggers and/or evictors, though I have the 
feeling that Flink was not designed for what we need to do here -- but I'll 
keep on searching.

In the meantime any advice is appreciated. If the goal is not clear I can 
provide more details.

Thank you,

Denis

From: Dollfus, Denis (TR Technology & Ops)
Sent: lundi 5 décembre 2016 16:31
To: user@flink.apache.org
Subject: RE: Equivalent of Rx combineLatest() on a join?

Asking the response helped me to find the answer (yes, rubber duck 
debugging)
 as it seems that the code below does what I need:

s3 = s1.join(s2)
.where(new KeySelector1()).equalTo(new KeySelector2())
.window(GlobalWindow.create())
.trigger(CountTrigger.of(1))
.apply(new JoinFunction);

If that's a common use case (in my view it is), a syntax shortcut could help 
developers, e.g. something like:

s3 = s1.join(s2)
.where(new KeySelector1()).equalTo(new KeySelector2())
.combineLatest(new JoinFunction);

Denis


From: Dollfus, Denis (TR Technology & Ops)
Sent: lundi 5 décembre 2016 12:27
To: user@flink.apache.org
Subject: Equivalent of Rx combineLatest() on a join?

Hi all,

[first email here, I'm new to Flink, Java and Scala, sorry if I missed 
something obvious]

I'm exploring Flink in the context of streaming calculators. Basically, the 
data flow boils down to multiple data streams with variable update rates (ms, 
seconds, ..., month) which are joined before being fed to calculators. The kind 
of operation I need is very similar to the Rx 
combineLatest
 operator, which results in a object being emitted whenever one of the streams 
is updated.

As there is no such operator predefined, I think I have to use a GlobalWindow 
and provide a custom WindowAssigner. The end result would look like this 
(pseudo java 8 code, I hope it's understandable):

DataStream s1 = env.addSource(..);
DataStream s2 = env.addSource(..);

S3 = s1.join(s2)
.where(s1 -> id)
.equalTo(s2 -> id)
.window(new MyCustomCombineLatestAssigner())
.apply( ... return new object combining data from s1 and from 
s2);

Is the approach correct, or is there a simpler way to achieve the same join + 
apply mechanism ?

Thank you,

Denis





This e-mail is for the sole use of the intended recipient and contains 
information that may be privileged and/or confidential. If you are not an 
intended recipient, please notify the sender by return e-mail and delete this 
e-mail and any attachments. Certain required legal entity disclosures can be 
accessed on our website.


Re: Resource under-utilization when using RocksDb state backend

2016-12-05 Thread Cliff Resnick
Hi Robert,

We're following 1.2-SNAPSHOT,  using event time. I have tried "iotop" and I
see usually less than 1 % IO. The most I've seen was a quick flash here or
there of something substantial (e.g. 19%, 52%) then back to nothing. I also
assumed we were disk-bound, but to use your metaphor I'm having trouble
finding any smoke. However, I'm not very experienced in sussing out IO
issues so perhaps there is something else I'm missing.

I'll keep investigating. If I continue to come up empty then I guess my
next steps may be to stage some independent tests directly against RocksDb.

-Cliff


On Mon, Dec 5, 2016 at 5:52 AM, Robert Metzger  wrote:

> Hi Cliff,
>
> which Flink version are you using?
> Are you using Eventtime or processing time windows?
>
> I suspect that your disks are "burning" (= your job is IO bound). Can you
> check with a tool like "iotop" how much disk IO Flink is producing?
> Then, I would set this number in relation with the theoretical maximum of
> your SSD's (a good rough estimate is to use dd for that).
>
> If you find that your disk bandwidth is saturated by Flink, you could look
> into tuning the RocksDB settings so that it uses more memory for caching.
>
> Regards,
> Robert
>
>
> On Fri, Dec 2, 2016 at 11:34 PM, Cliff Resnick  wrote:
>
>> In tests comparing RocksDb to fs state backend we observe much lower
>> throughput, around 10x slower. While the lowered throughput is expected,
>> what's perplexing is that machine load is also very low with RocksDb,
>> typically falling to  < 25% CPU and negligible IO wait (around 0.1%). Our
>> test instances are EC2 c3.xlarge which are 4 virtual CPUs and 7.5G RAM,
>> each running a single TaskManager in YARN, with 6.5G allocated memory per
>> TaskManager. The instances also have 2x40G attached SSDs which we have
>> mapped to `taskmanager.tmp.dir`.
>>
>> With FS state and 4 slots per TM, we will easily max out with an average
>> load average around 5 or 6, so we actually need throttle down the slots to
>> 3. With RocksDb using the Flink SSD configured options we see a load
>> average at around 1. Also, load (and actual) throughput remain more or less
>> constant no matter how many slots we use. The weak load is spread over all
>> CPUs.
>>
>> Here is a sample top:
>>
>> Cpu0  : 20.5%us,  0.0%sy,  0.0%ni, 79.5%id,  0.0%wa,  0.0%hi,  0.0%si,
>>  0.0%st
>> Cpu1  : 18.5%us,  0.0%sy,  0.0%ni, 81.5%id,  0.0%wa,  0.0%hi,  0.0%si,
>>  0.0%st
>> Cpu2  : 11.6%us,  0.7%sy,  0.0%ni, 87.0%id,  0.7%wa,  0.0%hi,  0.0%si,
>>  0.0%st
>> Cpu3  : 12.5%us,  0.3%sy,  0.0%ni, 86.8%id,  0.0%wa,  0.0%hi,  0.3%si,
>>  0.0%st
>>
>> Our pipeline uses tumbling windows, each with a ValueState keyed to a
>> 3-tuple of one string and two ints.. Each ValueState comprises a small set
>> of tuples around 5-7 fields each. The WindowFunction simply diffs agains
>> the set and updates state if there is a diff.
>>
>> Any ideas as to what the bottleneck is here? Any suggestions welcomed!
>>
>> -Cliff
>>
>>
>>
>>
>>
>>
>>
>


Re: separation of JVMs for different applications

2016-12-05 Thread Stephan Ewen
Hi!

Are your customers using YARN? In that case, the default configuration will
start a new YARN application per Flink job, no JVMs are shared between
jobs. By default, even each slot has its own JVM.

Greetings,
Stephan

PS: I think the "spawning new JVMs" is what Till referred to when saying
"spinning up a new cluster". Keep in mind that Flink is also a batch
processor, and it handles sequences of short batch jobs (as issued for
example by interactive shells) and it pre-allocates and manages a lot of
memory for batch jobs.



On Mon, Dec 5, 2016 at 3:48 PM, Manu Zhang  wrote:

> The pro for the multi-tenant cluster mode is that you can share data
>> between jobs and you don't have to spin up a new cluster for each job.
>
>
> I don't think we have to spin up a new cluster for each job if every job
> gets its own JVMs. For examples, Storm will launch a new worker(JVM) for a
> new job when free slots are available. How can we share data between jobs
> and why ?
>
>
>
> On Mon, Dec 5, 2016 at 6:27 PM, Till Rohrmann 
> wrote:
>
>> The pro for the multi-tenant cluster mode is that you can share data
>> between jobs and you don't have to spin up a new cluster for each job. This
>> might be helpful for scenarios where you want to run many short-lived and
>> light-weight jobs.
>>
>> But the important part is that you don't have to use this method. You can
>> also start a new Flink cluster per job which will then execute the job
>> isolated from any other jobs (given that you don't submit other jobs to
>> this cluster).
>>
>> Cheers,
>> Till
>>
>> On Sat, Dec 3, 2016 at 2:50 PM, Manu Zhang 
>> wrote:
>>
>>> Thanks Fabian and Till.
>>>
>>> We have customers who are interested in using Flink but very concerned
>>> about that "multiple jobs share the same set of TMs". I've just joined the
>>> community recently so I'm not sure whether there has been a discussion over
>>> the "multi-tenant cluster mode" before.
>>>
>>> The cons are one job/user's failure may crash another, which is
>>> unacceptable in a multi-tenant scenario.
>>> What are the pros ? Do the pros overweigh the cons ?
>>>
>>> Manu
>>>
>>> On Fri, Dec 2, 2016 at 7:06 PM Till Rohrmann 
>>> wrote:
>>>
 Hi Manu,

 with Flip-6 we will be able to support stricter application isolation
 by starting for each job a dedicated JobManager which will execute its
 tasks on TM reserved solely for this job. But at the same time we will
 continue supporting the multi-tenant cluster mode where tasks belonging to
 multiple jobs share the same set of TMs and, thus, might share information
 between them.

 Cheers,
 Till

 On Fri, Dec 2, 2016 at 11:19 AM, Fabian Hueske 
 wrote:

 Hi Manu,

 As far as I know, there are not plans to change the stand-alone
 deployment.
 FLIP-6 is focusing on deployments via resource providers (YARN, Mesos,
 etc.) which allow to start Flink processes per job.

 Till (in CC) is more familiar with the FLIP-6 effort and might be able
 to add more detail.

 Best,
 Fabian

 2016-12-01 4:16 GMT+01:00 Manu Zhang :

 Hi all,

 It seems tasks of different Flink applications can end up in the same
 JVM (TaskManager) in standalone mode. Isn't this fragile since errors in
 one application could crash another ? I checked FLIP-6
 
  but
 didn't found any mention of changing it in the future.

 Any thoughts or have I missed anything ?

 Thanks,
 Manu Zhang




>>
>


Re: JVM Non Heap Memory

2016-12-05 Thread Chesnay Schepler

Hey Daniel,

the fix won't make it into 1.1.4 since it is only relevant if you're 
using Flink Meters together
with either the Graphite or Ganglia Reporter. The Meter metric is 
however not available in

1.1 at all, so it can't be the underlying cause.

My fix is only for 1.2; the fixed issue could have caused the behavior.

Now, for clarification, the "metrics-meter-tick-thread-X" threads are 
not created by Flink.
With Meter's being out of the picture i thus think this is not an issue 
of Flink's

metric system.

Instead I believe kafka may be the culprit, I found a similar 
description here:

https://issues.apache.org/jira/browse/KAFKA-1521

Which kafka version are you using? Kafka internally also uses the 
DropWizard library,
and a particular version (2.2.0) of that is apparently known to be 
leaking threads.


Regards,
Chesnay

On 05.12.2016 17:30, Ufuk Celebi wrote:

Quick question since the Meter issue does _not_ apply to 1.1.3, which Flink 
metrics are you using?

– Ufuk

On 5 December 2016 at 16:44:47, Daniel Santos (dsan...@cryptolab.net) wrote:

Hello,
  
Thank you all for the kindly reply.
  
I've got the general idea. I am using version flink's 1.1.3.
  
So it seems the fix of Meter's won't make it to 1.1.4 ?
  
Best Regards,
  
Daniel Santos
  
  
On 12/05/2016 01:28 PM, Chesnay Schepler wrote:

We don't have to include it in 1.1.4 since Meter's do not exist in
1.1; my bad for tagging it in JIRA for 1.1.4.

On 05.12.2016 14:18, Ufuk Celebi wrote:

Just to note that the bug mentioned by Chesnay does not invalidate
Stefan's comments. ;-)

Chesnay's issue is here:
https://issues.apache.org/jira/browse/FLINK-5261

I added an issue to improve the documentation about cancellation
(https://issues.apache.org/jira/browse/FLINK-5260).

Which version of Flink are you using? Chesnay's fix will make it into
the upcoming 1.1.4 release.


On 5 December 2016 at 14:04:49, Chesnay Schepler (ches...@apache.org)
wrote:

Hello Daniel,
I'm afraid you stumbled upon a bug in Flink. Meters were not properly
cleaned up, causing the underlying dropwizard meter update threads to
not be shutdown either.
I've opened a JIRA
and will open a PR soon.
Thank your for reporting this issue.
Regards,
Chesnay
On 05.12.2016 12:05, Stefan Richter wrote:

Hi Daniel,

the behaviour you observe looks like some threads are not canceled.
Thread cancelation in Flink (and Java in general) is always
cooperative, where cooperative means that the thread you want to
cancel should somehow check cancelation and react to it. Sometimes
this also requires some effort from the client that wants to cancel a
thread. So if you implement e.g. custom operators or functions with
aerospike, you must ensure that they a) react on cancelation and b)
cleanup their resources. If you do not consider this, your aerospike
client might stay in a blocking call forever, in particular blocking
IO calls are prone to this. What you need to ensure is that
cancelation from the clients includes closing IO resources such as
streams to unblock the thread and allow for termination. This means
that you need your code must (to a certain degree) actively
participate in Flink's task lifecycle. In Flink 1.2 we introduce a
feature called CloseableRegistry, which makes participating in this
lifecycle easier w.r.t. closing resources. For the time being, you
should check that Flink’s task cancelation also causes your code to
close the aerospike client and check cancelation flags.

Best,
Stefan


Am 05.12.2016 um 11:42 schrieb Daniel Santos > >> >:

Hello,

I have done some threads checking and dumps. And I have disabled the
checkpointing.

Here are my findings.

I did a thread dump a few hours after I booted up the whole cluster.
(@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit )

The dump shows that most threads are of 3 sources.
*
**OutputFlusher --- 634 -- Sleeping State*

"OutputFlusher" - Thread t@4758
java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at
org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164)


Locked ownable synchronizers:
- None
*
**Metrics --- 376 ( Flink Metrics Reporter it's the only metrics
being used ) -- Parked State*

"metrics-meter-tick-thread-1" - Thread t@29024
java.lang.Thread.State: TIMED_WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)

at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)

at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)

at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)

at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)

at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)

at

Re: JVM Non Heap Memory

2016-12-05 Thread Ufuk Celebi
Quick question since the Meter issue does _not_ apply to 1.1.3, which Flink 
metrics are you using?

– Ufuk

On 5 December 2016 at 16:44:47, Daniel Santos (dsan...@cryptolab.net) wrote:
> Hello,
>  
> Thank you all for the kindly reply.
>  
> I've got the general idea. I am using version flink's 1.1.3.
>  
> So it seems the fix of Meter's won't make it to 1.1.4 ?
>  
> Best Regards,
>  
> Daniel Santos
>  
>  
> On 12/05/2016 01:28 PM, Chesnay Schepler wrote:
> > We don't have to include it in 1.1.4 since Meter's do not exist in
> > 1.1; my bad for tagging it in JIRA for 1.1.4.
> >
> > On 05.12.2016 14:18, Ufuk Celebi wrote:
> >> Just to note that the bug mentioned by Chesnay does not invalidate
> >> Stefan's comments. ;-)
> >>
> >> Chesnay's issue is here:
> >> https://issues.apache.org/jira/browse/FLINK-5261
> >>
> >> I added an issue to improve the documentation about cancellation
> >> (https://issues.apache.org/jira/browse/FLINK-5260).
> >>
> >> Which version of Flink are you using? Chesnay's fix will make it into
> >> the upcoming 1.1.4 release.
> >>
> >>
> >> On 5 December 2016 at 14:04:49, Chesnay Schepler (ches...@apache.org)
> >> wrote:
> >>> Hello Daniel,
> >>> I'm afraid you stumbled upon a bug in Flink. Meters were not properly
> >>> cleaned up, causing the underlying dropwizard meter update threads to
> >>> not be shutdown either.
> >>> I've opened a JIRA
> >>> and will open a PR soon.
> >>> Thank your for reporting this issue.
> >>> Regards,
> >>> Chesnay
> >>> On 05.12.2016 12:05, Stefan Richter wrote:
>  Hi Daniel,
> 
>  the behaviour you observe looks like some threads are not canceled.
>  Thread cancelation in Flink (and Java in general) is always
>  cooperative, where cooperative means that the thread you want to
>  cancel should somehow check cancelation and react to it. Sometimes
>  this also requires some effort from the client that wants to cancel a
>  thread. So if you implement e.g. custom operators or functions with
>  aerospike, you must ensure that they a) react on cancelation and b)
>  cleanup their resources. If you do not consider this, your aerospike
>  client might stay in a blocking call forever, in particular blocking
>  IO calls are prone to this. What you need to ensure is that
>  cancelation from the clients includes closing IO resources such as
>  streams to unblock the thread and allow for termination. This means
>  that you need your code must (to a certain degree) actively
>  participate in Flink's task lifecycle. In Flink 1.2 we introduce a
>  feature called CloseableRegistry, which makes participating in this
>  lifecycle easier w.r.t. closing resources. For the time being, you
>  should check that Flink’s task cancelation also causes your code to
>  close the aerospike client and check cancelation flags.
> 
>  Best,
>  Stefan
> 
> > Am 05.12.2016 um 11:42 schrieb Daniel Santos > >> >:
> >
> > Hello,
> >
> > I have done some threads checking and dumps. And I have disabled the
> > checkpointing.
> >
> > Here are my findings.
> >
> > I did a thread dump a few hours after I booted up the whole cluster.
> > (@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit )
> >
> > The dump shows that most threads are of 3 sources.
> > *
> > **OutputFlusher --- 634 -- Sleeping State*
> >
> > "OutputFlusher" - Thread t@4758
> > java.lang.Thread.State: TIMED_WAITING
> > at java.lang.Thread.sleep(Native Method)
> > at
> > org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164)
> >   
> >
> >
> > Locked ownable synchronizers:
> > - None
> > *
> > **Metrics --- 376 ( Flink Metrics Reporter it's the only metrics
> > being used ) -- Parked State*
> >
> > "metrics-meter-tick-thread-1" - Thread t@29024
> > java.lang.Thread.State: TIMED_WAITING
> > at sun.misc.Unsafe.park(Native Method)
> > - parking to wait for (a
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)  
> >
> > at
> > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)  
> >
> > at
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> >   
> >
> > at
> > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
> >
> > at
> > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
> >   
> >
> > at
> > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
> >   
> >
> > at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
> >   
> >
> > at
> > 

Variable Tuple Type

2016-12-05 Thread Max Kießling
Hey,

for a project we need to represent data as lists. So each entry in the
DataSets basically holds a list of basic data type elements. When
processing the data we keep joining lists of the same shape and so the
list size grows over time

e.g. (a,b,c) x (c,d,e) -> (a,b,c,d,e)

Currently our solution basically is
to use a `DataSet`.

The problem with this is, that the performance seems to be quite poor
compared to using tuples. When we compare the same job using either
Tuples or Lists, Tuples seem to be 2-10 times faster.

However since we can't know in advance how many elements the list will
have for a given job, using the Tuple0-25 would be both cumbersome and
complex, especially if the list size outgrows 25.
Using the Record class, the results look promising but using Tuples is
still double as fast.

In general our tests yield that the runtime is
Tuple < Array < Record < List

So my question is, do you see a possible way to create a variable length
tuple type which can grow almost indefinitely while keeping most of the
benefits of the TupleXX classes but skipping lots of the overhead of
Record (like keeping track of possible null values etc)

Thanks a lot
Best Max


Re: Thread 'SortMerger spilling thread' terminated due to an exception: No space left on device

2016-12-05 Thread Miguel Coimbra
Hello Fabian,

Thanks for the attention. Still haven't solved this.
I did set up a cron job to clean the Docker images daily - thanks for that
hint.
As a last resort, I am going to look into a 2 TB NAS to see if this works.

What is confusing me is that this happens also for the com-orkut.ungraph.txt
dataset which is much smaller than com-friendster.ungraph.txt but not that
much bigger than the com-dblp.ungraph.txt.

DBLP - ​I am able to run the DBLP on one TaskManager container.​
https://snap.stanford.edu/data/com-DBLP.html
Nodes 317080  ~0.3 M
Edges 1049866 ~ 1 M

Orkut - no disk space error.
https://snap.stanford.edu/data/com-Orkut.html
Nodes 3072441 ~3 M
Edges 117185083 ~ 117 M

​Friendster - no disk space error.
https://snap.stanford.edu/data/com-Friendster.html
Nodes 65608366 ~65 M
Edges 1806067135 ~ 1800 M​

For testing purposes, I'm using a JobManager (in its own Docker container),
a single TaskManager (in its own Docker container) with the following
config parameters:

Heap is currently configured to 6 GB:
taskmanager.heap.mb: 6000

Parallelism is set as such:

taskmanager.numberOfTaskSlots: 1
parallelism.default: 1

It is my understanding that if I want to test for example N = 3
TaskManagers (each in its own Docker container) with minimum parallelism
within each, I would use:

taskmanager.numberOfTaskSlots: 1
parallelism.default: 3


Fabian, do you think you could help estimate how much disk space would be
required to compute the Orkut data set for example?
I am running a Flink 1.1.3 Docker cluster with a single TaskManager.
This is the code I am using to read SNAP datasets and to test with Orkut,
Friendster and DBLP, in case you have a minute to inspect it and see if
something is amiss:

public class App {
public static void main(String[] args) {
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
final String dataPath = args[0];

final DataSet> edgeTuples =
env.readCsvFile(dataPath)
.fieldDelimiter("\t") // node IDs are separated by spaces
.ignoreComments("#")  // comments start with "#"
.types(Long.class, Long.class);

// Dealing with an undirected graph, so we call .getUndirected() at
the end.
final Graph graph = Graph.fromTuple2DataSet(
edgeTuples,
new MapFunction() {
private static final long serialVersionUID =
8713516577419451509L;
private long test = 1L;
public Long map(Long value) {
return value;
}
},
env
).getUndirected();


try {
// Generate a unique ID value for each vertex.
// Based on
https://github.com/apache/flink/blob/master/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/MusicProfiles.java
DataSet> idsWithInitialLabels =
DataSetUtils.zipWithUniqueId(graph.getVertexIds())
.map(
new MapFunction, Tuple2>() {
private static final long serialVersionUID =
-6348050104902440929L;

@Override
public Tuple2 map(Tuple2
tuple2) throws Exception {
return new Tuple2(tuple2.f1,
tuple2.f0);
}
}
);

// Build the graph with initialization values.
final Graph graphWithIDs = graph
.joinWithVertices(idsWithInitialLabels,
new VertexJoinFunction() {
private static final long serialVersionUID =
-315275119763760820L;
public Long vertexJoin(Long vertexValue, Long
inputValue) {
return inputValue;
}
});

// Execute LabelPropagation over it.
DataSet> result = graphWithIDs.run(new
LabelPropagation(10));

graph.getVertices().print();

TimeUnit.SECONDS.sleep(2);

System.out.println("graphWithIDs");
graphWithIDs.getVertices().print();
graphWithIDs.getEdges().print();

TimeUnit.SECONDS.sleep(2);

// Group vertices by similar communities.
final List> collected = result.collect();
final HashMap commSizes = new
HashMap();
for(Vertex v : collected) {
//System.out.println("collected[v] = id:" + v.getId() +
"\tval:" + v.getValue());
if(!commSizes.containsKey(v.getValue())) {
commSizes.put(v.getValue(), new ArrayList());
}

Re: JVM Non Heap Memory

2016-12-05 Thread Daniel Santos

Hello,

Thank you all for the kindly reply.

I've got the general idea. I am using version flink's 1.1.3.

So it seems the fix of Meter's won't make it to 1.1.4 ?

Best Regards,

Daniel Santos


On 12/05/2016 01:28 PM, Chesnay Schepler wrote:
We don't have to include it in 1.1.4 since Meter's do not exist in 
1.1; my bad for tagging it in JIRA for 1.1.4.


On 05.12.2016 14:18, Ufuk Celebi wrote:
Just to note that the bug mentioned by Chesnay does not invalidate 
Stefan's comments. ;-)


Chesnay's issue is here: 
https://issues.apache.org/jira/browse/FLINK-5261


I added an issue to improve the documentation about cancellation 
(https://issues.apache.org/jira/browse/FLINK-5260).


Which version of Flink are you using? Chesnay's fix will make it into 
the upcoming 1.1.4 release.



On 5 December 2016 at 14:04:49, Chesnay Schepler (ches...@apache.org) 
wrote:

Hello Daniel,
  I'm afraid you stumbled upon a bug in Flink. Meters were not properly
cleaned up, causing the underlying dropwizard meter update threads to
not be shutdown either.
  I've opened a JIRA
and will open a PR soon.
  Thank your for reporting this issue.
  Regards,
Chesnay
  On 05.12.2016 12:05, Stefan Richter wrote:

Hi Daniel,

the behaviour you observe looks like some threads are not canceled.
Thread cancelation in Flink (and Java in general) is always
cooperative, where cooperative means that the thread you want to
cancel should somehow check cancelation and react to it. Sometimes
this also requires some effort from the client that wants to cancel a
thread. So if you implement e.g. custom operators or functions with
aerospike, you must ensure that they a) react on cancelation and b)
cleanup their resources. If you do not consider this, your aerospike
client might stay in a blocking call forever, in particular blocking
IO calls are prone to this. What you need to ensure is that
cancelation from the clients includes closing IO resources such as
streams to unblock the thread and allow for termination. This means
that you need your code must (to a certain degree) actively
participate in Flink's task lifecycle. In Flink 1.2 we introduce a
feature called CloseableRegistry, which makes participating in this
lifecycle easier w.r.t. closing resources. For the time being, you
should check that Flink’s task cancelation also causes your code to
close the aerospike client and check cancelation flags.

Best,
Stefan


Am 05.12.2016 um 11:42 schrieb Daniel Santos > >> >:

Hello,

I have done some threads checking and dumps. And I have disabled the
checkpointing.

Here are my findings.

I did a thread dump a few hours after I booted up the whole cluster.
(@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit )

The dump shows that most threads are of 3 sources.
*
**OutputFlusher --- 634 -- Sleeping State*

"OutputFlusher" - Thread t@4758
java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at
org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164) 



Locked ownable synchronizers:
- None
*
**Metrics --- 376 ( Flink Metrics Reporter it's the only metrics
being used ) -- Parked State*

"metrics-meter-tick-thread-1" - Thread t@29024
java.lang.Thread.State: TIMED_WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) 


at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) 


at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) 


at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) 


at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) 


at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) 


at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) 


at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 


at java.lang.Thread.run(Thread.java:745)

Locked ownable synchronizers:
- None
*
*

*tend -- 220 ( Aerospike Client Thread ) -- Sleeping State
*

"tend" - Thread t@29011
java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at com.aerospike.client.util.Util.sleep(Util.java:38)
at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
at java.lang.Thread.run(Thread.java:745)

Locked ownable synchronizers:
- None


I have 2 streaming jobs and a batch Job that runs once in a while.

Streaming job A runs with a parallel of 2 and runs Aerospike only in
RichSink .

Streaming job B runs with a parallel of 24 and runs Aerospike in
RichFilterFunction / RichMapFunction with open and close methods, in
order to open and close the client.

Batch Job runs Aerospike Client in RichFilterFunction /
RichMapFunction with open and close methods in order to open and
close the client.

Next thing 

Re: separation of JVMs for different applications

2016-12-05 Thread Manu Zhang
>
> The pro for the multi-tenant cluster mode is that you can share data
> between jobs and you don't have to spin up a new cluster for each job.


I don't think we have to spin up a new cluster for each job if every job
gets its own JVMs. For examples, Storm will launch a new worker(JVM) for a
new job when free slots are available. How can we share data between jobs
and why ?



On Mon, Dec 5, 2016 at 6:27 PM, Till Rohrmann  wrote:

> The pro for the multi-tenant cluster mode is that you can share data
> between jobs and you don't have to spin up a new cluster for each job. This
> might be helpful for scenarios where you want to run many short-lived and
> light-weight jobs.
>
> But the important part is that you don't have to use this method. You can
> also start a new Flink cluster per job which will then execute the job
> isolated from any other jobs (given that you don't submit other jobs to
> this cluster).
>
> Cheers,
> Till
>
> On Sat, Dec 3, 2016 at 2:50 PM, Manu Zhang 
> wrote:
>
>> Thanks Fabian and Till.
>>
>> We have customers who are interested in using Flink but very concerned
>> about that "multiple jobs share the same set of TMs". I've just joined the
>> community recently so I'm not sure whether there has been a discussion over
>> the "multi-tenant cluster mode" before.
>>
>> The cons are one job/user's failure may crash another, which is
>> unacceptable in a multi-tenant scenario.
>> What are the pros ? Do the pros overweigh the cons ?
>>
>> Manu
>>
>> On Fri, Dec 2, 2016 at 7:06 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Manu,
>>>
>>> with Flip-6 we will be able to support stricter application isolation by
>>> starting for each job a dedicated JobManager which will execute its tasks
>>> on TM reserved solely for this job. But at the same time we will continue
>>> supporting the multi-tenant cluster mode where tasks belonging to multiple
>>> jobs share the same set of TMs and, thus, might share information between
>>> them.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Dec 2, 2016 at 11:19 AM, Fabian Hueske 
>>> wrote:
>>>
>>> Hi Manu,
>>>
>>> As far as I know, there are not plans to change the stand-alone
>>> deployment.
>>> FLIP-6 is focusing on deployments via resource providers (YARN, Mesos,
>>> etc.) which allow to start Flink processes per job.
>>>
>>> Till (in CC) is more familiar with the FLIP-6 effort and might be able
>>> to add more detail.
>>>
>>> Best,
>>> Fabian
>>>
>>> 2016-12-01 4:16 GMT+01:00 Manu Zhang :
>>>
>>> Hi all,
>>>
>>> It seems tasks of different Flink applications can end up in the same
>>> JVM (TaskManager) in standalone mode. Isn't this fragile since errors in
>>> one application could crash another ? I checked FLIP-6
>>>  
>>> but
>>> didn't found any mention of changing it in the future.
>>>
>>> Any thoughts or have I missed anything ?
>>>
>>> Thanks,
>>> Manu Zhang
>>>
>>>
>>>
>>>
>


Re: microsecond resolution

2016-12-05 Thread jeff jacobson
Thanks for clearing things up, Stephan and Kostas

On Mon, Dec 5, 2016 at 8:08 AM, Kostas Kloudas 
wrote:

> Hi Jeff,
>
> As Stephan said the interpretation of the timestamps is up to the logic of
> your job.
> And as for the documentation, thanks for reporting this.
> We should update it.
>
> Cheers,
> Kostas
>
> On Dec 5, 2016, at 1:56 PM, Stephan Ewen  wrote:
>
> @Jeff - good point about the docs.
>
> I think Kostas is right though - the event timestamps are up to the user's
> interpretation.
>
> The built-in window assigners interpret them as "Unix Epoch Millis", but
> you can define your own window assigners that interpret the timestamps
> differently.
> The system interprets them as also as Unix Epoch Millis when mixing event
> time and processing time (because processing time comes from
> System.currentTimeMillis())
>
> So, you can "re-interpret" them by using custom window assigners and not
> using processing time.
> If you want to use a processing time like component, I'd suggest to
> incorporate that in your watermark generator.
>
> Stephan
>
>
>
> On Mon, Dec 5, 2016 at 1:05 PM, jeff jacobson  com> wrote:
>
>> Thanks Kostas. So if we're comfortable treating timestamps as longs (and
>> doing conversions to human readable time at our application level), we can
>> use CEP, ML lib etc. in addition to all basic Flink functions? That's great
>> news?
>>
>> To Matthias's point, *why then does the following not read "**Both
>> timestamps and watermarks are specified as longs"?* Before I go headlong
>> into developing on Flink, I just want to be sure I'm covered here. Again,
>> thanks. (The Youtube videos from FlinkForward are also great, btw.
>> Incredibly impressed with Data Artisans.)
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>> apis/streaming/event_timestamps_watermarks.html#assigning-timestamps
>>
>> "Both timestamps and watermarks are specified as milliseconds since the Java
>> epoch of 1970-01-01T00:00:00Z."
>>
>>
>>
>> On Mon, Dec 5, 2016 at 4:57 AM, Kostas Kloudas <
>> k.klou...@data-artisans.com> wrote:
>>
>>> Hi Jeff,
>>>
>>> Actually in Flink timestamps are simple longs.
>>> This means that you can assign anything you want as a timestamp, as long
>>> as it fits in a long.
>>>
>>> Hope this helps and if not, we can discuss to see if we can find a
>>> solution that
>>> fits your needs together.
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Dec 4, 2016, at 11:39 PM, jeff jacobson <
>>> jeff.harold.jacob...@gmail.com> wrote:
>>>
>>> Wow. Really? Is there a way to do micros? A hack? A Jira story? Most
>>> (all?) U.S. equity and European futures, options, and stock markets
>>> timestamp in microseconds. This makes Flink unusable for a massive industry
>>> vertical. To the extent lower-frequency time-series data is being used
>>> (e.g. end of data prices), stream processing is kind of overkill. Love
>>> everything I've read about Flink...there's got to be a way to make this
>>> work, no?
>>>
>>> On Sun, Dec 4, 2016 at 5:27 PM, Matthias J. Sax 
>>> wrote:
>>>
 -BEGIN PGP SIGNED MESSAGE-
 Hash: SHA512

 Oh. My bad... Did not read your question carefully enough.

 Than the answer is no, it does not support microseconds (only
 milliseconds).

 - -Matthias


 On 12/4/16 2:22 PM, jeff jacobson wrote:
 > Sorry if I'm missing something. That link mentions milliseconds,
 > no? My question is whether or not I can specify microseconds where
 > 1000microseconds = 1millisecond. Thanks!
 >
 > On Sun, Dec 4, 2016 at 5:05 PM, Matthias J. Sax  > wrote:
 >
 > Yes. It does.
 >
 > See:
 > https://ci.apache.org/projects/flink/flink-docs-release-1.1/
 apis/strea
 ming/event_timestamps_watermarks.html#assigning-timestamps
 
 >
 >
 
 >
 >
 > "Both timestamps and watermarks are specified as millliseconds
 > since the Java epoch of 1970-01-01T00:00:00Z."
 >
 >
 >
 > -Matthias
 >
 >
 > On 12/04/2016 10:57 AM, jeff jacobson wrote:
 >> I've sourced stackoverflow, the docs, and the web but I can't
 >> figure out: does flink support microsecond timestamp resolution?
 >> Thanks!
 >
 >
 -BEGIN PGP SIGNATURE-
 Comment: GPGTools - https://gpgtools.org

 iQIYBAEBCgAGBQJYRJhUAAoJELz8Z8hxAGOiNKoP32ChGeNd7N8Zco2q6lsu+Hxd
 

Re: JVM Non Heap Memory

2016-12-05 Thread Chesnay Schepler
We don't have to include it in 1.1.4 since Meter's do not exist in 1.1; 
my bad for tagging it in JIRA for 1.1.4.


On 05.12.2016 14:18, Ufuk Celebi wrote:

Just to note that the bug mentioned by Chesnay does not invalidate Stefan's 
comments. ;-)

Chesnay's issue is here: https://issues.apache.org/jira/browse/FLINK-5261

I added an issue to improve the documentation about cancellation 
(https://issues.apache.org/jira/browse/FLINK-5260).

Which version of Flink are you using? Chesnay's fix will make it into the 
upcoming 1.1.4 release.


On 5 December 2016 at 14:04:49, Chesnay Schepler (ches...@apache.org) wrote:

Hello Daniel,
  
I'm afraid you stumbled upon a bug in Flink. Meters were not properly

cleaned up, causing the underlying dropwizard meter update threads to
not be shutdown either.
  
I've opened a JIRA

and will open a PR soon.
  
Thank your for reporting this issue.
  
Regards,

Chesnay
  
On 05.12.2016 12:05, Stefan Richter wrote:

Hi Daniel,

the behaviour you observe looks like some threads are not canceled.
Thread cancelation in Flink (and Java in general) is always
cooperative, where cooperative means that the thread you want to
cancel should somehow check cancelation and react to it. Sometimes
this also requires some effort from the client that wants to cancel a
thread. So if you implement e.g. custom operators or functions with
aerospike, you must ensure that they a) react on cancelation and b)
cleanup their resources. If you do not consider this, your aerospike
client might stay in a blocking call forever, in particular blocking
IO calls are prone to this. What you need to ensure is that
cancelation from the clients includes closing IO resources such as
streams to unblock the thread and allow for termination. This means
that you need your code must (to a certain degree) actively
participate in Flink's task lifecycle. In Flink 1.2 we introduce a
feature called CloseableRegistry, which makes participating in this
lifecycle easier w.r.t. closing resources. For the time being, you
should check that Flink’s task cancelation also causes your code to
close the aerospike client and check cancelation flags.

Best,
Stefan


Am 05.12.2016 um 11:42 schrieb Daniel Santos > >> >:

Hello,

I have done some threads checking and dumps. And I have disabled the
checkpointing.

Here are my findings.

I did a thread dump a few hours after I booted up the whole cluster.
(@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit )

The dump shows that most threads are of 3 sources.
*
**OutputFlusher --- 634 -- Sleeping State*

"OutputFlusher" - Thread t@4758
java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at
org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164)

Locked ownable synchronizers:
- None
*
**Metrics --- 376 ( Flink Metrics Reporter it's the only metrics
being used ) -- Parked State*

"metrics-meter-tick-thread-1" - Thread t@29024
java.lang.Thread.State: TIMED_WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Locked ownable synchronizers:
- None
*
*

*tend -- 220 ( Aerospike Client Thread ) -- Sleeping State
*

"tend" - Thread t@29011
java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at com.aerospike.client.util.Util.sleep(Util.java:38)
at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
at java.lang.Thread.run(Thread.java:745)

Locked ownable synchronizers:
- None


I have 2 streaming jobs and a batch Job that runs once in a while.

Streaming job A runs with a parallel of 2 and runs Aerospike only in
RichSink .

Streaming job B runs with a parallel of 24 and runs Aerospike in
RichFilterFunction / RichMapFunction with open and close methods, in
order to open and close the client.

Batch Job runs Aerospike Client in RichFilterFunction /
RichMapFunction with open and close methods in order to open and
close the client.

Next thing I cancelled all the streaming jobs @5/12/2016 and checked
the threads and the JVM non-heap usage.

JVM non-heap usage reaches 3GB, threads go down, but some still
linger around and they are the following.

*Metrics --- 790 ( Flink Metrics Reporter it's the only 

Re: JVM Non Heap Memory

2016-12-05 Thread Ufuk Celebi
Just to note that the bug mentioned by Chesnay does not invalidate Stefan's 
comments. ;-)

Chesnay's issue is here: https://issues.apache.org/jira/browse/FLINK-5261

I added an issue to improve the documentation about cancellation 
(https://issues.apache.org/jira/browse/FLINK-5260).

Which version of Flink are you using? Chesnay's fix will make it into the 
upcoming 1.1.4 release.


On 5 December 2016 at 14:04:49, Chesnay Schepler (ches...@apache.org) wrote:
> Hello Daniel,
>  
> I'm afraid you stumbled upon a bug in Flink. Meters were not properly
> cleaned up, causing the underlying dropwizard meter update threads to
> not be shutdown either.
>  
> I've opened a JIRA  
> and will open a PR soon.
>  
> Thank your for reporting this issue.
>  
> Regards,
> Chesnay
>  
> On 05.12.2016 12:05, Stefan Richter wrote:
> > Hi Daniel,
> >
> > the behaviour you observe looks like some threads are not canceled.
> > Thread cancelation in Flink (and Java in general) is always
> > cooperative, where cooperative means that the thread you want to
> > cancel should somehow check cancelation and react to it. Sometimes
> > this also requires some effort from the client that wants to cancel a
> > thread. So if you implement e.g. custom operators or functions with
> > aerospike, you must ensure that they a) react on cancelation and b)
> > cleanup their resources. If you do not consider this, your aerospike
> > client might stay in a blocking call forever, in particular blocking
> > IO calls are prone to this. What you need to ensure is that
> > cancelation from the clients includes closing IO resources such as
> > streams to unblock the thread and allow for termination. This means
> > that you need your code must (to a certain degree) actively
> > participate in Flink's task lifecycle. In Flink 1.2 we introduce a
> > feature called CloseableRegistry, which makes participating in this
> > lifecycle easier w.r.t. closing resources. For the time being, you
> > should check that Flink’s task cancelation also causes your code to
> > close the aerospike client and check cancelation flags.
> >
> > Best,
> > Stefan
> >
> >> Am 05.12.2016 um 11:42 schrieb Daniel Santos > >> >:
> >>
> >> Hello,
> >>
> >> I have done some threads checking and dumps. And I have disabled the
> >> checkpointing.
> >>
> >> Here are my findings.
> >>
> >> I did a thread dump a few hours after I booted up the whole cluster.
> >> (@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit )
> >>
> >> The dump shows that most threads are of 3 sources.
> >> *
> >> **OutputFlusher --- 634 -- Sleeping State*
> >>
> >> "OutputFlusher" - Thread t@4758
> >> java.lang.Thread.State: TIMED_WAITING
> >> at java.lang.Thread.sleep(Native Method)
> >> at
> >> org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164)
> >>   
> >>
> >> Locked ownable synchronizers:
> >> - None
> >> *
> >> **Metrics --- 376 ( Flink Metrics Reporter it's the only metrics
> >> being used ) -- Parked State*
> >>
> >> "metrics-meter-tick-thread-1" - Thread t@29024
> >> java.lang.Thread.State: TIMED_WAITING
> >> at sun.misc.Unsafe.park(Native Method)
> >> - parking to wait for (a
> >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)  
> >> at
> >> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)  
> >> at
> >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> >>   
> >> at
> >> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
> >> at
> >> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
> >>   
> >> at
> >> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
> >>   
> >> at
> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
> >>   
> >> at
> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> >>   
> >> at java.lang.Thread.run(Thread.java:745)
> >>
> >> Locked ownable synchronizers:
> >> - None
> >> *
> >> *
> >>
> >> *tend -- 220 ( Aerospike Client Thread ) -- Sleeping State
> >> *
> >>
> >> "tend" - Thread t@29011
> >> java.lang.Thread.State: TIMED_WAITING
> >> at java.lang.Thread.sleep(Native Method)
> >> at com.aerospike.client.util.Util.sleep(Util.java:38)
> >> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
> >> at java.lang.Thread.run(Thread.java:745)
> >>
> >> Locked ownable synchronizers:
> >> - None
> >>
> >>
> >> I have 2 streaming jobs and a batch Job that runs once in a while.
> >>
> >> Streaming job A runs with a parallel of 2 and runs Aerospike only in
> >> RichSink .
> >>
> >> Streaming job B runs with a parallel of 24 and runs Aerospike in
> >> RichFilterFunction / RichMapFunction with open and close methods, in
> >> order to open and close the client.
> >>
> >> Batch Job runs Aerospike Client in 

Re: microsecond resolution

2016-12-05 Thread Kostas Kloudas
Hi Jeff,

As Stephan said the interpretation of the timestamps is up to the logic of your 
job.
And as for the documentation, thanks for reporting this. 
We should update it.

Cheers,
Kostas

> On Dec 5, 2016, at 1:56 PM, Stephan Ewen  wrote:
> 
> @Jeff - good point about the docs.
> 
> I think Kostas is right though - the event timestamps are up to the user's 
> interpretation.
> 
> The built-in window assigners interpret them as "Unix Epoch Millis", but you 
> can define your own window assigners that interpret the timestamps 
> differently.
> The system interprets them as also as Unix Epoch Millis when mixing event 
> time and processing time (because processing time comes from 
> System.currentTimeMillis())
> 
> So, you can "re-interpret" them by using custom window assigners and not 
> using processing time.
> If you want to use a processing time like component, I'd suggest to 
> incorporate that in your watermark generator.
> 
> Stephan
> 
> 
> 
> On Mon, Dec 5, 2016 at 1:05 PM, jeff jacobson  > wrote:
> Thanks Kostas. So if we're comfortable treating timestamps as longs (and 
> doing conversions to human readable time at our application level), we can 
> use CEP, ML lib etc. in addition to all basic Flink functions? That's great 
> news?
> 
> To Matthias's point, why then does the following not read "Both timestamps 
> and watermarks are specified as longs"? Before I go headlong into developing 
> on Flink, I just want to be sure I'm covered here. Again, thanks. (The 
> Youtube videos from FlinkForward are also great, btw. Incredibly impressed 
> with Data Artisans.)
> 
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_timestamps_watermarks.html#assigning-timestamps
>  
> 
> 
> "Both timestamps and watermarks are specified as milliseconds since the Java 
> epoch of 1970-01-01T00:00:00Z."
> 
> 
> 
> On Mon, Dec 5, 2016 at 4:57 AM, Kostas Kloudas  > wrote:
> Hi Jeff,
> 
> Actually in Flink timestamps are simple longs.
> This means that you can assign anything you want as a timestamp, as long as 
> it fits in a long.
> 
> Hope this helps and if not, we can discuss to see if we can find a solution 
> that 
> fits your needs together.
> 
> Cheers,
> Kostas
> 
>> On Dec 4, 2016, at 11:39 PM, jeff jacobson > > wrote:
>> 
>> Wow. Really? Is there a way to do micros? A hack? A Jira story? Most (all?) 
>> U.S. equity and European futures, options, and stock markets timestamp in 
>> microseconds. This makes Flink unusable for a massive industry vertical. To 
>> the extent lower-frequency time-series data is being used (e.g. end of data 
>> prices), stream processing is kind of overkill. Love everything I've read 
>> about Flink...there's got to be a way to make this work, no?
>> 
>> On Sun, Dec 4, 2016 at 5:27 PM, Matthias J. Sax > > wrote:
>> -BEGIN PGP SIGNED MESSAGE-
>> Hash: SHA512
>> 
>> Oh. My bad... Did not read your question carefully enough.
>> 
>> Than the answer is no, it does not support microseconds (only
>> milliseconds).
>> 
>> - -Matthias
>> 
>> 
>> On 12/4/16 2:22 PM, jeff jacobson wrote:
>> > Sorry if I'm missing something. That link mentions milliseconds,
>> > no? My question is whether or not I can specify microseconds where
>> > 1000microseconds = 1millisecond. Thanks!
>> >
>> > On Sun, Dec 4, 2016 at 5:05 PM, Matthias J. Sax > > 
>> > >> wrote:
>> >
>> > Yes. It does.
>> >
>> > See:
>> > https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/strea
>> ming/event_timestamps_watermarks.html#assigning-timestamps 
>> 
>> >
>> >
>> > ing/event_timestamps_watermarks.html#assigning-timestamps 
>> >
>> >
>> > "Both timestamps and watermarks are specified as millliseconds
>> > since the Java epoch of 1970-01-01T00:00:00Z."
>> >
>> >
>> >
>> > -Matthias
>> >
>> >
>> > On 12/04/2016 10:57 AM, jeff jacobson wrote:
>> >> I've sourced stackoverflow, the docs, and the web but I can't
>> >> figure out: does flink support microsecond timestamp resolution?
>> >> Thanks!
>> >
>> >
>> -BEGIN PGP SIGNATURE-
>> Comment: GPGTools - https://gpgtools.org 
>> 
>> 

Re: JVM Non Heap Memory

2016-12-05 Thread Chesnay Schepler

Hello Daniel,

I'm afraid you stumbled upon a bug in Flink. Meters were not properly 
cleaned up, causing the underlying dropwizard meter update threads to 
not be shutdown either.


I've opened a JIRA  
and will open a PR soon.


Thank your for reporting this issue.

Regards,
Chesnay

On 05.12.2016 12:05, Stefan Richter wrote:

Hi Daniel,

the behaviour you observe looks like some threads are not canceled. 
Thread cancelation in Flink (and Java in general) is always 
cooperative, where cooperative means that the thread you want to 
cancel should somehow check cancelation and react to it. Sometimes 
this also requires some effort from the client that wants to cancel a 
thread. So if you implement e.g. custom operators or functions with 
aerospike, you must ensure that they a) react on cancelation and b) 
cleanup their resources. If you do not consider this, your aerospike 
client might stay in a blocking call forever, in particular blocking 
IO calls are prone to this. What you need to ensure is that 
cancelation from the clients includes closing IO resources such as 
streams to unblock the thread and allow for termination. This means 
that you need your code must (to a certain degree) actively 
participate in Flink's task lifecycle. In Flink 1.2 we introduce a 
feature called CloseableRegistry, which makes participating in this 
lifecycle easier w.r.t. closing resources. For the time being, you 
should check that Flink’s task cancelation also causes your code to 
close the aerospike client and check cancelation flags.


Best,
Stefan

Am 05.12.2016 um 11:42 schrieb Daniel Santos >:


Hello,

I have done some threads checking and dumps. And I have disabled the 
checkpointing.


Here are my findings.

I did a thread dump a few hours after I booted up the whole cluster. 
(@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit )


The dump shows that most threads are of 3 sources.
*
**OutputFlusher --- 634 -- Sleeping State*

"OutputFlusher" - Thread t@4758
   java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164)


   Locked ownable synchronizers:
- None
*
**Metrics --- 376 ( Flink Metrics Reporter it's the only metrics 
being used ) -- Parked State*


"metrics-meter-tick-thread-1" - Thread t@29024
   java.lang.Thread.State: TIMED_WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
- None
*
*

*tend -- 220 ( Aerospike Client Thread ) -- Sleeping State
*

 "tend" - Thread t@29011
   java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at com.aerospike.client.util.Util.sleep(Util.java:38)
at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
- None


I have 2 streaming jobs and a batch Job that runs once in a while.

Streaming job A runs with a parallel of 2 and runs Aerospike only in 
RichSink .


Streaming job B runs with a parallel of 24 and runs Aerospike in 
RichFilterFunction / RichMapFunction with open and close methods, in 
order to open and close the client.


Batch Job runs Aerospike Client in RichFilterFunction / 
RichMapFunction with open and close methods in order to open and 
close the client.


Next thing I cancelled all the streaming jobs @5/12/2016 and checked 
the threads and the JVM non-heap usage.


JVM non-heap usage reaches 3GB, threads go down, but some still 
linger around and they are the following.


*Metrics --- 790 ( Flink Metrics Reporter it's the only metrics being 
used ) *


"metrics-meter-tick-thread-1" - Thread t@29024
   java.lang.Thread.State: TIMED_WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at 

Re: microsecond resolution

2016-12-05 Thread Stephan Ewen
@Jeff - good point about the docs.

I think Kostas is right though - the event timestamps are up to the user's
interpretation.

The built-in window assigners interpret them as "Unix Epoch Millis", but
you can define your own window assigners that interpret the timestamps
differently.
The system interprets them as also as Unix Epoch Millis when mixing event
time and processing time (because processing time comes from
System.currentTimeMillis())

So, you can "re-interpret" them by using custom window assigners and not
using processing time.
If you want to use a processing time like component, I'd suggest to
incorporate that in your watermark generator.

Stephan



On Mon, Dec 5, 2016 at 1:05 PM, jeff jacobson <
jeff.harold.jacob...@gmail.com> wrote:

> Thanks Kostas. So if we're comfortable treating timestamps as longs (and
> doing conversions to human readable time at our application level), we can
> use CEP, ML lib etc. in addition to all basic Flink functions? That's great
> news?
>
> To Matthias's point, *why then does the following not read "**Both
> timestamps and watermarks are specified as longs"?* Before I go headlong
> into developing on Flink, I just want to be sure I'm covered here. Again,
> thanks. (The Youtube videos from FlinkForward are also great, btw.
> Incredibly impressed with Data Artisans.)
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/
> apis/streaming/event_timestamps_watermarks.html#assigning-timestamps
>
> "Both timestamps and watermarks are specified as milliseconds since the Java
> epoch of 1970-01-01T00:00:00Z."
>
>
>
> On Mon, Dec 5, 2016 at 4:57 AM, Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
>> Hi Jeff,
>>
>> Actually in Flink timestamps are simple longs.
>> This means that you can assign anything you want as a timestamp, as long
>> as it fits in a long.
>>
>> Hope this helps and if not, we can discuss to see if we can find a
>> solution that
>> fits your needs together.
>>
>> Cheers,
>> Kostas
>>
>> On Dec 4, 2016, at 11:39 PM, jeff jacobson > m> wrote:
>>
>> Wow. Really? Is there a way to do micros? A hack? A Jira story? Most
>> (all?) U.S. equity and European futures, options, and stock markets
>> timestamp in microseconds. This makes Flink unusable for a massive industry
>> vertical. To the extent lower-frequency time-series data is being used
>> (e.g. end of data prices), stream processing is kind of overkill. Love
>> everything I've read about Flink...there's got to be a way to make this
>> work, no?
>>
>> On Sun, Dec 4, 2016 at 5:27 PM, Matthias J. Sax  wrote:
>>
>>> -BEGIN PGP SIGNED MESSAGE-
>>> Hash: SHA512
>>>
>>> Oh. My bad... Did not read your question carefully enough.
>>>
>>> Than the answer is no, it does not support microseconds (only
>>> milliseconds).
>>>
>>> - -Matthias
>>>
>>>
>>> On 12/4/16 2:22 PM, jeff jacobson wrote:
>>> > Sorry if I'm missing something. That link mentions milliseconds,
>>> > no? My question is whether or not I can specify microseconds where
>>> > 1000microseconds = 1millisecond. Thanks!
>>> >
>>> > On Sun, Dec 4, 2016 at 5:05 PM, Matthias J. Sax >> > > wrote:
>>> >
>>> > Yes. It does.
>>> >
>>> > See:
>>> > https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/strea
>>> ming/event_timestamps_watermarks.html#assigning-timestamps
>>> 
>>> >
>>> >
>>> >> ing/event_timestamps_watermarks.html#assigning-timestamps
>>> 
>>> >
>>> >
>>> > "Both timestamps and watermarks are specified as millliseconds
>>> > since the Java epoch of 1970-01-01T00:00:00Z."
>>> >
>>> >
>>> >
>>> > -Matthias
>>> >
>>> >
>>> > On 12/04/2016 10:57 AM, jeff jacobson wrote:
>>> >> I've sourced stackoverflow, the docs, and the web but I can't
>>> >> figure out: does flink support microsecond timestamp resolution?
>>> >> Thanks!
>>> >
>>> >
>>> -BEGIN PGP SIGNATURE-
>>> Comment: GPGTools - https://gpgtools.org
>>>
>>> iQIYBAEBCgAGBQJYRJhUAAoJELz8Z8hxAGOiNKoP32ChGeNd7N8Zco2q6lsu+Hxd
>>> JZq62ey3wTrIUS+3oRlILwnu81cViQHtMMVBly3+YnqB85gNiaEUxEQTQCdKPl8G
>>> AqxoFIkMcrKGzwGXigKnCAoVIiyuPeNuhY1d1yv4rWrkt7qb0lCC02Xoq1C0hoS6
>>> Stwk62GXmNRXPYpyjnSq/iAIMbjWaU+ZU0t4V3J8loroNuJ5QcUsJLfRXeo3/5ho
>>> f42L+IANyB5K7vnTxNZYyf5ShNVbTY9/iFaviluxrCNztqGTo7CxMpcyWyMS3wcF
>>> ycXcq/daB+guEJpW0sm4JtMPSsQ/kN99c/ig3t0HX1kDV7xrDDSF2qPvbYOWF38n
>>> omTr7RY3YRFi5LOKvBGa96Aw5UYjMddjcqozWId6xgdXfvz6RUeJCWa9RW8I6ptg
>>> 8TaJpM2WgDJMgMuzdl8dDv65l78DkLlNlNo53O66b/9Pt78P75KNjj8naD5kkj4C
>>> i9amwnUNNEnZucA2/1vhzr6cVSzrzBLL7juVj0VmABZo4itUZjjR0UkN7MB+ioWU
>>> trNhaXgE6EP/160n6D0/NUu02prm3jq8mK6gu9lZFWGbAeCUcch+CbvWSaiXAw3H

Re: microsecond resolution

2016-12-05 Thread jeff jacobson
Thanks Kostas. So if we're comfortable treating timestamps as longs (and
doing conversions to human readable time at our application level), we can
use CEP, ML lib etc. in addition to all basic Flink functions? That's great
news?

To Matthias's point, *why then does the following not read "**Both
timestamps and watermarks are specified as longs"?* Before I go headlong
into developing on Flink, I just want to be sure I'm covered here. Again,
thanks. (The Youtube videos from FlinkForward are also great, btw.
Incredibly impressed with Data Artisans.)


https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/
event_timestamps_watermarks.html#assigning-timestamps

"Both timestamps and watermarks are specified as milliseconds since the Java
epoch of 1970-01-01T00:00:00Z."



On Mon, Dec 5, 2016 at 4:57 AM, Kostas Kloudas 
wrote:

> Hi Jeff,
>
> Actually in Flink timestamps are simple longs.
> This means that you can assign anything you want as a timestamp, as long
> as it fits in a long.
>
> Hope this helps and if not, we can discuss to see if we can find a
> solution that
> fits your needs together.
>
> Cheers,
> Kostas
>
> On Dec 4, 2016, at 11:39 PM, jeff jacobson 
> wrote:
>
> Wow. Really? Is there a way to do micros? A hack? A Jira story? Most
> (all?) U.S. equity and European futures, options, and stock markets
> timestamp in microseconds. This makes Flink unusable for a massive industry
> vertical. To the extent lower-frequency time-series data is being used
> (e.g. end of data prices), stream processing is kind of overkill. Love
> everything I've read about Flink...there's got to be a way to make this
> work, no?
>
> On Sun, Dec 4, 2016 at 5:27 PM, Matthias J. Sax  wrote:
>
>> -BEGIN PGP SIGNED MESSAGE-
>> Hash: SHA512
>>
>> Oh. My bad... Did not read your question carefully enough.
>>
>> Than the answer is no, it does not support microseconds (only
>> milliseconds).
>>
>> - -Matthias
>>
>>
>> On 12/4/16 2:22 PM, jeff jacobson wrote:
>> > Sorry if I'm missing something. That link mentions milliseconds,
>> > no? My question is whether or not I can specify microseconds where
>> > 1000microseconds = 1millisecond. Thanks!
>> >
>> > On Sun, Dec 4, 2016 at 5:05 PM, Matthias J. Sax > > > wrote:
>> >
>> > Yes. It does.
>> >
>> > See:
>> > https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/strea
>> ming/event_timestamps_watermarks.html#assigning-timestamps
>> 
>> >
>> >
>> > ing/event_timestamps_watermarks.html#assigning-timestamps
>> 
>> >
>> >
>> > "Both timestamps and watermarks are specified as millliseconds
>> > since the Java epoch of 1970-01-01T00:00:00Z."
>> >
>> >
>> >
>> > -Matthias
>> >
>> >
>> > On 12/04/2016 10:57 AM, jeff jacobson wrote:
>> >> I've sourced stackoverflow, the docs, and the web but I can't
>> >> figure out: does flink support microsecond timestamp resolution?
>> >> Thanks!
>> >
>> >
>> -BEGIN PGP SIGNATURE-
>> Comment: GPGTools - https://gpgtools.org
>>
>> iQIYBAEBCgAGBQJYRJhUAAoJELz8Z8hxAGOiNKoP32ChGeNd7N8Zco2q6lsu+Hxd
>> JZq62ey3wTrIUS+3oRlILwnu81cViQHtMMVBly3+YnqB85gNiaEUxEQTQCdKPl8G
>> AqxoFIkMcrKGzwGXigKnCAoVIiyuPeNuhY1d1yv4rWrkt7qb0lCC02Xoq1C0hoS6
>> Stwk62GXmNRXPYpyjnSq/iAIMbjWaU+ZU0t4V3J8loroNuJ5QcUsJLfRXeo3/5ho
>> f42L+IANyB5K7vnTxNZYyf5ShNVbTY9/iFaviluxrCNztqGTo7CxMpcyWyMS3wcF
>> ycXcq/daB+guEJpW0sm4JtMPSsQ/kN99c/ig3t0HX1kDV7xrDDSF2qPvbYOWF38n
>> omTr7RY3YRFi5LOKvBGa96Aw5UYjMddjcqozWId6xgdXfvz6RUeJCWa9RW8I6ptg
>> 8TaJpM2WgDJMgMuzdl8dDv65l78DkLlNlNo53O66b/9Pt78P75KNjj8naD5kkj4C
>> i9amwnUNNEnZucA2/1vhzr6cVSzrzBLL7juVj0VmABZo4itUZjjR0UkN7MB+ioWU
>> trNhaXgE6EP/160n6D0/NUu02prm3jq8mK6gu9lZFWGbAeCUcch+CbvWSaiXAw3H
>> BOieCsgZD1wfXQJ3wEmnqj/YP94uDlx1IjynskDevjk6OIyIysbBSIqgsUK6fvQ8
>> ztXO6ls7ARMOBmA=
>> =/O+Q
>> -END PGP SIGNATURE-
>>
>
>
>


Re: Flink 1.1.3 OOME Permgen

2016-12-05 Thread Konstantin Knauf
Yep, I would suppose so. You need to have the reference from the
AppClassLoader to the UserCodeClassLoader.

On 05.12.2016 12:37, Robert Metzger wrote:
> I executed this snipped in each Flink job:
> 
> @Override
> public void open(Configuration config) {
>   ObjectMapper somethingWithJackson = new ObjectMapper();
>   try {
> ObjectNode on = somethingWithJackson.readValue("{\"a\": \"b\"}",
> ObjectNode.class);
>   } catch (IOException e) {
> throw new RuntimeException("You failed", e);
>   }
> }
> 
> But I suspect that I need to map my JSON to a POJO?
>  
> 
> On Mon, Dec 5, 2016 at 12:33 PM, Konstantin Knauf
> > wrote:
> 
> Hi Robert,
> 
> you need to actually use Jackson. The problematic field is a cache,
> which is filled by all classes, which were serialized/deserialized by
> Jackson.
> 
> Best,
> 
> Konstantin
> 
> On 05.12.2016 11 :55, Robert Metzger wrote:
> > I've submitted Wordcount 410 times to a testing cluster and a streaming
> > job 290 times and I could not reproduce the issue with 1.1.3. Also, the
> > heapdump of one of the TaskManagers looked pretty normal.
> >
> > Do you have any ideas how to reproduce the issue?
> >
> > On Fri, Dec 2, 2016 at 3:21 PM, Robert Metzger  
> > >> wrote:
> >
> > Thank you for reporting the issue Konstantin.
> > I've filed a JIRA for the jackson
> > issue: https://issues.apache.org/jira/browse/FLINK-5233
> 
> >  >.
> > As I said in the JIRA, I propose to upgrade to Jackson 2.7.8, as
> > this version contains the fix for the issue, but its not a major
> > jackson upgrade.
> >
> > Any chance you could try to if 2.7.8 fixes the issue as well?
> >
> >
> > On Fri, Dec 2, 2016 at 11:12 AM, Fabian Hueske  
> > >> wrote:
> >
> > Hi Konstantin,
> >
> > Regarding 2): I've opened FLINK-5227 to update the documentation
> > [1].
> >
> > Regarding the Row type: The Row type was introduced for
> > flink-table and was later used by other modules. There is
> > FLINK-5186 to move Row and all the related TypeInfo (+serializer
> > and comparator) to flink-core [2]. That should solve your issue.
> >
> > Some of the connector modules which provide TableSource and
> > TableSinks have dependencies on flink-table as well. I'll check
> > that these are optional dependencies to avoid that we pull in
> > Calcite through connectors for jobs that do not not need it.
> >
> > Thanks,
> > Fabian
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-5227
> 
> >  >
> > [2] https://issues.apache.org/jira/browse/FLINK-5186
> 
> >  >
> >
> > 2016-11-30 17:51 GMT+01:00 Konstantin Knauf
> >  
> >  >>:
> >
> > Hi Stefan,
> >
> > unfortunately, I can not share any heap dumps with you. I
> > was able to
> > resolve some of the issues my self today, the root causes
> > were different
> > for different jobs.
> >
> > 1) Jackson 2.7.2 (which comes with Flink) has a known class
> > loading
> > issue (see
> > https://github.com/FasterXML/jackson-databind/issues/1363
> 
> >   
>   >).
> > Shipping a shaded version of Jackson 2.8.4 with our user
> > code helped. I
> > recommend upgrading Flink's Jackson version soon.
> >
> > 2) We have a dependency on the flink-table [1] , which
> ships
> > with
> > Calcite including 

Re: Flink 1.1.3 OOME Permgen

2016-12-05 Thread Robert Metzger
I executed this snipped in each Flink job:

@Override
public void open(Configuration config) {
  ObjectMapper somethingWithJackson = new ObjectMapper();
  try {
ObjectNode on = somethingWithJackson.readValue("{\"a\": \"b\"}",
ObjectNode.class);
  } catch (IOException e) {
throw new RuntimeException("You failed", e);
  }
}

But I suspect that I need to map my JSON to a POJO?


On Mon, Dec 5, 2016 at 12:33 PM, Konstantin Knauf <
konstantin.kn...@tngtech.com> wrote:

> Hi Robert,
>
> you need to actually use Jackson. The problematic field is a cache,
> which is filled by all classes, which were serialized/deserialized by
> Jackson.
>
> Best,
>
> Konstantin
>
> On 05.12.2016 11:55, Robert Metzger wrote:
> > I've submitted Wordcount 410 times to a testing cluster and a streaming
> > job 290 times and I could not reproduce the issue with 1.1.3. Also, the
> > heapdump of one of the TaskManagers looked pretty normal.
> >
> > Do you have any ideas how to reproduce the issue?
> >
> > On Fri, Dec 2, 2016 at 3:21 PM, Robert Metzger  > > wrote:
> >
> > Thank you for reporting the issue Konstantin.
> > I've filed a JIRA for the jackson
> > issue: https://issues.apache.org/jira/browse/FLINK-5233
> > .
> > As I said in the JIRA, I propose to upgrade to Jackson 2.7.8, as
> > this version contains the fix for the issue, but its not a major
> > jackson upgrade.
> >
> > Any chance you could try to if 2.7.8 fixes the issue as well?
> >
> >
> > On Fri, Dec 2, 2016 at 11:12 AM, Fabian Hueske  > > wrote:
> >
> > Hi Konstantin,
> >
> > Regarding 2): I've opened FLINK-5227 to update the documentation
> > [1].
> >
> > Regarding the Row type: The Row type was introduced for
> > flink-table and was later used by other modules. There is
> > FLINK-5186 to move Row and all the related TypeInfo (+serializer
> > and comparator) to flink-core [2]. That should solve your issue.
> >
> > Some of the connector modules which provide TableSource and
> > TableSinks have dependencies on flink-table as well. I'll check
> > that these are optional dependencies to avoid that we pull in
> > Calcite through connectors for jobs that do not not need it.
> >
> > Thanks,
> > Fabian
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-5227
> > 
> > [2] https://issues.apache.org/jira/browse/FLINK-5186
> > 
> >
> > 2016-11-30 17:51 GMT+01:00 Konstantin Knauf
> >  > >:
> >
> > Hi Stefan,
> >
> > unfortunately, I can not share any heap dumps with you. I
> > was able to
> > resolve some of the issues my self today, the root causes
> > were different
> > for different jobs.
> >
> > 1) Jackson 2.7.2 (which comes with Flink) has a known class
> > loading
> > issue (see
> > https://github.com/FasterXML/jackson-databind/issues/1363
> > )
> .
> > Shipping a shaded version of Jackson 2.8.4 with our user
> > code helped. I
> > recommend upgrading Flink's Jackson version soon.
> >
> > 2) We have a dependency on the flink-table [1] , which ships
> > with
> > Calcite including the Calcite JDBC Driver, which can not
> > been collected
> > cause of the known problem with the java.sql.DriverManager.
> > Putting the
> > flink-table in Flink's lib dir instead of shipping it with
> > the user code
> > helps. You should update the documentation, because this
> > will always
> > happen when using flink-table, I think. So I wonder, why
> > this has not
> > come up before actually.
> >
> > 3) Unresolved: Some Threads in a custom source which are not
> > proberly
> > shut down and keep references to the UserCodeClassLoader. I
> > did not have
> > time to look into this issue so far.
> >
> > Cheers,
> >
> > Konstantin
> >
> > [1] Side note: We only need flink-table for the "Row" class
> > used in the
> > JdbcOutputFormat, so it might make sense to move this class
> > somewhere
> > else. Naturally, we also tried to exclude the "transitive"
> > dependency on
> > org.apache.calcite until we noticed that calcite is packaged
> >   

Re: flink-job-in-yarn,has max memory

2016-12-05 Thread Robert Metzger
Hi,

The TaskManager reports a total memory usage of 3 GB. That's fine, given
that you requested containers of size 4GB. Flink doesn't allocate all the
memory assigned to the container to the heap.

Are you running a batch or a streaming job?


On Tue, Nov 29, 2016 at 12:43 PM,  wrote:

> Hi,
>  i have a flink job,and abt assembly to get a jar file,so i put it to
> yarn and run it,use the follow commend:
> 
> /home/www/flink-1.1.1/bin/flink run \
> -m yarn-cluster \
> -yn 1 \
> -ys 2 \
> -yjm 4096 \
> -ytm 4096 \
> --class skRecomm.SkProRecommFlink \
> --classpath file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar \
> --classpath file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar
> \
> --classpath file:///opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar \
> --classpath 
> file:///opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar
> \
> --classpath file:///opt/cloudera/parcels/CDH/lib/hbase/lib/guava-12.0.1.jar
> \
> /home/www/flink-mining/deploy/zx_article-7cffb87.jar
> 
> ---
> the commend is in  supervisor on a computer(*,*,*,22),
> 
> and  flink/conf/flink-conf.yaml,i set those pargam,
> --
> fs.hdfs.hadoopconf: /etc/hadoop/conf/
> jobmanager.web.port: 8081
> parallelism.default: 1
> taskmanager.memory.preallocate: false
> taskmanager.numberOfTaskSlots: 1
> taskmanager.heap.mb: 512
> jobmanager.heap.mb: 256
> arallelism.default: 1
> jobmanager.rpc.port: 6123
> jobmanager.rpc.address: localhost
>
> --
> the job is success, can find follow message in yarn monitor,
>
> flink.base.dir.path /data1/yarn/nm/usercache/work/appcache/application_
> 1472623395420_36719/container_e03_1472623395420_36719_01_01
> fs.hdfs.hadoopconf /etc/hadoop/conf/
> jobmanager.heap.mb 256
> jobmanager.rpc.address *.*.*.79  -(is not *.*.*.22,and taskmanager is
> *.*.*.69)
> jobmanager.rpc.port 32987
> jobmanager.web.port 0
> parallelism.default1
> recovery.zookeeper.path.namespace application_1472623395420_36719
> taskmanager.heap.mb 512
> taskmanager.memory.preallocate false
> taskmanager.numberOfTaskSlots 1
>
> -
> Overview
> Data Port All Slots Free Slots CPU Cores Physical Memory Free Memory Flink
> Managed Memory
> 304712   032 189 GB2.88 GB
>  1.96 GB
> 
> ---
> Memory
> JVM (Heap/Non-Heap)
> Type Committed Initial   Maximum
> Heap 2.92 GB  3.00 GB2.92 GB
> Non-Heap  53.4 MB  23.4 MB130 MB
> Total  2.97 GB  3.02 GB3.04 GB
> -
> Outside JVM
> Type Count Used Capacity
> Direct 510860 KB 860 KB
> Mapped 0  0 B  0 B
> ---
>
> i find in computer(*,*,*,22),the pid=345 has 2.36g memory,and the pid=345
> is the job that  from supervisor run,
>
> i really do not know why ?the job was run in yarn ,why occupy so much
> memory in computer(*.*.*.22),i just run the job in computer(*.*.*.22).
>
> thank you answer my question.
>
>


Re: Flink 1.1.3 OOME Permgen

2016-12-05 Thread Konstantin Knauf
Hi Robert,

you need to actually use Jackson. The problematic field is a cache,
which is filled by all classes, which were serialized/deserialized by
Jackson.

Best,

Konstantin

On 05.12.2016 11:55, Robert Metzger wrote:
> I've submitted Wordcount 410 times to a testing cluster and a streaming
> job 290 times and I could not reproduce the issue with 1.1.3. Also, the
> heapdump of one of the TaskManagers looked pretty normal.
> 
> Do you have any ideas how to reproduce the issue?
> 
> On Fri, Dec 2, 2016 at 3:21 PM, Robert Metzger  > wrote:
> 
> Thank you for reporting the issue Konstantin.
> I've filed a JIRA for the jackson
> issue: https://issues.apache.org/jira/browse/FLINK-5233
> .
> As I said in the JIRA, I propose to upgrade to Jackson 2.7.8, as
> this version contains the fix for the issue, but its not a major
> jackson upgrade.
> 
> Any chance you could try to if 2.7.8 fixes the issue as well?
> 
> 
> On Fri, Dec 2, 2016 at 11:12 AM, Fabian Hueske  > wrote:
> 
> Hi Konstantin,
> 
> Regarding 2): I've opened FLINK-5227 to update the documentation
> [1].
> 
> Regarding the Row type: The Row type was introduced for
> flink-table and was later used by other modules. There is
> FLINK-5186 to move Row and all the related TypeInfo (+serializer
> and comparator) to flink-core [2]. That should solve your issue.
> 
> Some of the connector modules which provide TableSource and
> TableSinks have dependencies on flink-table as well. I'll check
> that these are optional dependencies to avoid that we pull in
> Calcite through connectors for jobs that do not not need it.
> 
> Thanks,
> Fabian
> 
> [1] https://issues.apache.org/jira/browse/FLINK-5227
> 
> [2] https://issues.apache.org/jira/browse/FLINK-5186
> 
> 
> 2016-11-30 17:51 GMT+01:00 Konstantin Knauf
>  >:
> 
> Hi Stefan,
> 
> unfortunately, I can not share any heap dumps with you. I
> was able to
> resolve some of the issues my self today, the root causes
> were different
> for different jobs.
> 
> 1) Jackson 2.7.2 (which comes with Flink) has a known class
> loading
> issue (see
> https://github.com/FasterXML/jackson-databind/issues/1363
> ).
> Shipping a shaded version of Jackson 2.8.4 with our user
> code helped. I
> recommend upgrading Flink's Jackson version soon.
> 
> 2) We have a dependency on the flink-table [1] , which ships
> with
> Calcite including the Calcite JDBC Driver, which can not
> been collected
> cause of the known problem with the java.sql.DriverManager.
> Putting the
> flink-table in Flink's lib dir instead of shipping it with
> the user code
> helps. You should update the documentation, because this
> will always
> happen when using flink-table, I think. So I wonder, why
> this has not
> come up before actually.
> 
> 3) Unresolved: Some Threads in a custom source which are not
> proberly
> shut down and keep references to the UserCodeClassLoader. I
> did not have
> time to look into this issue so far.
> 
> Cheers,
> 
> Konstantin
> 
> [1] Side note: We only need flink-table for the "Row" class
> used in the
> JdbcOutputFormat, so it might make sense to move this class
> somewhere
> else. Naturally, we also tried to exclude the "transitive"
> dependency on
> org.apache.calcite until we noticed that calcite is packaged
> with
> flink-table, so that you can not even exclude it. What is
> the reasons
> for this?
> 
> 
> 
> 
> On 30.11.2016 00:55, Stefan Richter wrote:
> > Hi,
> >
> > could you somehow provide us a heap dump from a TM that
> run for a while (ideally, shortly before an OOME)? This
> would greatly help us to figure out if there is a
> classloader leak that causes the problem.
> >
> > Best,
> > Stefan
> >
> >> Am 29.11.2016 um 18:39 schrieb Konstantin Knauf
> 

Re: CEP issue

2016-12-05 Thread Robert Metzger
Hi Kieran,

which statebackend are you using for your CEP job? Using RocksDB as a state
backend could potentially fix the issue.
What's the number of keys in your stream?


On Tue, Nov 29, 2016 at 3:18 PM, kieran .  wrote:

> Hello,
>
> I am currently building a multi-tenant monitoring application and
> exploring the effectiveness of different Complex Event Processors (CEP) and
> whether or not this would be a potential solution for what I want to
> achieve. I have created a small test application which utilises Flink and
> its CEP but I have come across some issues when dealing with a large number
> of metrics to monitor when using patterns/pattern streams. Flink seems to
> operate as expected with one, or several patterns each consuming it's own
> PatternStream, but as soon as more are introduced the memory usage of Flink
> seems to rise rather quickly and eventually throw an OutOfMemoryError. My
> initial idea was to create one pattern/pattern stream for each metric that
> I need to monitor, but there could be many thousands of these.
>
> I create the PatternStream per Pattern like this to monitor a metric:
>
> *  Pattern pattern = Pattern. begin(
> patternName ).subtype( MetricData.class )*
>
> *.where(*
>
> *(evt -> evt.getValues().get( "max" ).longValue() > 50.0*
>
> *&& evt.account_id.equals( accountName )) );*
>
>
> *check.withPattern( pattern )*
>
> *.withTimePeriod( Integer.valueOf( 1 ) )*
>
> *.withCooldown( Integer.valueOf( 1 ) )*
>
> *.withName( checkName )*
>
> *.withAlertStatus( AlertStatus.OK )*
>
> *
> .setPatternStream(CEP.pattern(messageStream.keyBy("account_id"), pattern));*
>
>
> To trigger these patterns, I use
>
> *PatternSelectFunction psf = new
> PatternSelectFunction()*
>
> *{*
>
> *@Override*
>
> *public MetricWarning select( Map map )
> throws Exception*
>
> *{*
>
> *return new MetricWarning(map.get(patternKey), name,
> accountId);*
>
> *}*
>
>
> *};*
>
>
> *try*
>
> *{*
>
> *check.getPatternStream().select(psf);*
>
> *}*
>
> *catch( Exception exception )*
>
> *{*
>
> *exception.printStackTrace();*
>
> *}*
>
>
>
> The pattern in the above example is tied to a specific stream which would
> result in one stream per pattern and this seems to be an issue using this
> approach. If it would be possible to run one pattern stream and switching
> out the patterns when needed, then perhaps this would be a viable solution.
> Am I approaching this in the right way by creating a stream for each
> pattern?
>
> Would it be possible to create a set of Pattern processors that could be
> run against a single PatternStream or is there anything you could suggest
> which would allow me to do this with Flink?
>
> Thanks,
> - Kieran
>
> 
>


Equivalent of Rx combineLatest() on a join?

2016-12-05 Thread denis.dollfus
Hi all,

[first email here, I'm new to Flink, Java and Scala, sorry if I missed 
something obvious]

I'm exploring Flink in the context of streaming calculators. Basically, the 
data flow boils down to multiple data streams with variable update rates (ms, 
seconds, ..., month) which are joined before being fed to calculators. The kind 
of operation I need is very similar to the Rx 
combineLatest 
operator, which results in a object being emitted whenever one of the streams 
is updated.

As there is no such operator predefined, I think I have to use a GlobalWindow 
and provide a custom WindowAssigner. The end result would look like this 
(pseudo java 8 code, I hope it's understandable):

DataStream s1 = env.addSource(..);
DataStream s2 = env.addSource(..);

S3 = s1.join(s2)
.where(s1 -> id)
.equalTo(s2 -> id)
.window(new MyCustomCombineLatestAssigner())
.apply( ... return new object combining data from s1 and from 
s2);

Is the approach correct, or is there a simpler way to achieve the same join + 
apply mechanism ?

Thank you,

Denis





This e-mail is for the sole use of the intended recipient and contains 
information that may be privileged and/or confidential. If you are not an 
intended recipient, please notify the sender by return e-mail and delete this 
e-mail and any attachments. Certain required legal entity disclosures can be 
accessed on our website.


Re: Dealing with Multiple sinks in Flink

2016-12-05 Thread Robert Metzger
For enabling JMX when starting Flink from your IDE, you need to do the
following:

Configuration configuration = new Configuration();
configuration.setString("metrics.reporters", "my_jmx_reporter");
configuration.setString("metrics.reporter.my_jmx_reporter.class",
"org.apache.flink.metrics.jmx.JMXReporter");
configuration.setString("metrics.reporter.my_jmx_reporter.port",
"9020-9040");
StreamExecutionEnvironment see =
StreamExecutionEnvironment.createLocalEnvironment(1, configuration);





On Mon, Dec 5, 2016 at 11:56 AM, vinay patil 
wrote:

> Yes I had configured it as given in the documentation.
> I can see this line in Job Manager Logs : Started JMX server on port 9020
> (but this was on EMR )
>
> How to do this locally ? can we check these metrics while running the
> pipeline from IDE ? If yes what is teh default JMX port to connect ? or do
> we need to do some configuration locally ?
>
> Regards,
> Vinay Patil
>
> On Mon, Dec 5, 2016 at 4:14 PM, rmetzger0 [via Apache Flink User Mailing
> List archive.] <[hidden email]
> > wrote:
>
>> Hi Vinay,
>>
>> the JMX port depends on the port you've configured for the JMX metrics
>> reporter.
>> Did you configure it?
>>
>> Regards,
>> Robert
>>
>>
>> On Fri, Dec 2, 2016 at 11:14 AM, vinay patil <[hidden email]
>> > wrote:
>>
>>> Hi Robert,
>>>
>>> I had resolved this issue earlier as I had not set the Kafka source
>>> parallelism to number of partitions, so I was getting the issue of window
>>> not getting triggered.
>>>
>>> Now I am facing the same issue, I tried to check the watermark value by
>>> using visualVM locally but I am not seeing that value there, I have
>>> attached
>>> the snapahot of visualVM
>>> >> bble.com/file/n10412/jVisualVMMetrics.png>
>>>
>>> Just to verify , JMX port runs on 9010 by default , right ?, because
>>> when I
>>> tried to connect to it locally, I could not connect
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-flink-user-maili
>>> ng-list-archive.2336050.n4.nabble.com/Dealing-with-Multiple-
>>> sinks-in-Flink-tp8643p10412.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>>
>>
>>
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Dealing-with-Multiple-sinks-in-Flink-tp8643p10442.html
>> To start a new topic under Apache Flink User Mailing List archive., email 
>> [hidden
>> email] 
>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>> NAML
>> 
>>
>
>
> --
> View this message in context: Re: Dealing with Multiple sinks in Flink
> 
> Sent from the Apache Flink User Mailing List archive. mailing list archive
>  at
> Nabble.com.
>


Re: Dealing with Multiple sinks in Flink

2016-12-05 Thread vinay patil
Yes I had configured it as given in the documentation.
I can see this line in Job Manager Logs : Started JMX server on port 9020
(but this was on EMR )

How to do this locally ? can we check these metrics while running the
pipeline from IDE ? If yes what is teh default JMX port to connect ? or do
we need to do some configuration locally ?

Regards,
Vinay Patil

On Mon, Dec 5, 2016 at 4:14 PM, rmetzger0 [via Apache Flink User Mailing
List archive.]  wrote:

> Hi Vinay,
>
> the JMX port depends on the port you've configured for the JMX metrics
> reporter.
> Did you configure it?
>
> Regards,
> Robert
>
>
> On Fri, Dec 2, 2016 at 11:14 AM, vinay patil <[hidden email]
> > wrote:
>
>> Hi Robert,
>>
>> I had resolved this issue earlier as I had not set the Kafka source
>> parallelism to number of partitions, so I was getting the issue of window
>> not getting triggered.
>>
>> Now I am facing the same issue, I tried to check the watermark value by
>> using visualVM locally but I am not seeing that value there, I have
>> attached
>> the snapahot of visualVM
>> > nabble.com/file/n10412/jVisualVMMetrics.png>
>>
>> Just to verify , JMX port runs on 9010 by default , right ?, because when
>> I
>> tried to connect to it locally, I could not connect
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Dealing-with-Multip
>> le-sinks-in-Flink-tp8643p10412.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Dealing-with-Multiple-sinks-in-Flink-tp8643p10442.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dealing-with-Multiple-sinks-in-Flink-tp8643p10445.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: JVM Non Heap Memory

2016-12-05 Thread Stefan Richter
Hi Daniel,

the behaviour you observe looks like some threads are not canceled. Thread 
cancelation in Flink (and Java in general) is always cooperative, where 
cooperative means that the thread you want to cancel should somehow check 
cancelation and react to it. Sometimes this also requires some effort from the 
client that wants to cancel a thread. So if you implement e.g. custom operators 
or functions with aerospike, you must ensure that they a) react on cancelation 
and b) cleanup their resources. If you do not consider this, your aerospike 
client might stay in a blocking call forever, in particular blocking IO calls 
are prone to this. What you need to ensure is that cancelation from the clients 
includes closing IO resources such as streams to unblock the thread and allow 
for termination. This means that you need your code must (to a certain degree) 
actively participate in Flink's task lifecycle. In Flink 1.2 we introduce a 
feature called CloseableRegistry, which makes participating in this lifecycle 
easier w.r.t. closing resources. For the time being, you should check that 
Flink’s task cancelation also causes your code to close the aerospike client 
and check cancelation flags.

Best,
Stefan

> Am 05.12.2016 um 11:42 schrieb Daniel Santos :
> 
> Hello,
> 
> I have done some threads checking and dumps. And I have disabled the 
> checkpointing.
> Here are my findings. 
> I did a thread dump a few hours after I booted up the whole cluster. 
> (@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit )
> 
> The dump shows that most threads are of 3 sources.
> 
> OutputFlusher --- 634 -- Sleeping State
> 
> "OutputFlusher" - Thread t@4758
>java.lang.Thread.State: TIMED_WAITING
> at java.lang.Thread.sleep(Native Method)
> at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164)
> 
>Locked ownable synchronizers:
> - None
> 
> Metrics --- 376 ( Flink Metrics Reporter it's the only metrics being used ) 
> -- Parked State
> 
> "metrics-meter-tick-thread-1" - Thread t@29024
>java.lang.Thread.State: TIMED_WAITING
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
> at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 
>Locked ownable synchronizers:
> - None
> 
> tend -- 220 ( Aerospike Client Thread ) -- Sleeping State
> 
>  "tend" - Thread t@29011
>java.lang.Thread.State: TIMED_WAITING
> at java.lang.Thread.sleep(Native Method)
> at com.aerospike.client.util.Util.sleep(Util.java:38)
> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
> at java.lang.Thread.run(Thread.java:745)
> 
>Locked ownable synchronizers:
> - None
> 
> I have 2 streaming jobs and a batch Job that runs once in a while.
> 
> Streaming job A runs with a parallel of 2 and runs Aerospike only in RichSink 
> .
> 
> Streaming job B runs with a parallel of 24 and runs Aerospike in 
> RichFilterFunction / RichMapFunction with open and close methods, in order to 
> open and close the client.
> 
> Batch Job runs Aerospike Client in RichFilterFunction / RichMapFunction with 
> open and close methods in order to open and close the client.
> 
> Next thing I cancelled all the streaming jobs @5/12/2016 and checked the 
> threads and the JVM non-heap usage.
> 
> JVM non-heap usage reaches 3GB, threads go down, but some still linger around 
> and they are the following.
> 
> Metrics --- 790 ( Flink Metrics Reporter it's the only metrics being used ) 
> 
> "metrics-meter-tick-thread-1" - Thread t@29024
>java.lang.Thread.State: TIMED_WAITING
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at 
> 

Re: Dealing with Multiple sinks in Flink

2016-12-05 Thread Robert Metzger
Hi Vinay,

the JMX port depends on the port you've configured for the JMX metrics
reporter.
Did you configure it?

Regards,
Robert


On Fri, Dec 2, 2016 at 11:14 AM, vinay patil 
wrote:

> Hi Robert,
>
> I had resolved this issue earlier as I had not set the Kafka source
> parallelism to number of partitions, so I was getting the issue of window
> not getting triggered.
>
> Now I am facing the same issue, I tried to check the watermark value by
> using visualVM locally but I am not seeing that value there, I have
> attached
> the snapahot of visualVM
>  n4.nabble.com/file/n10412/jVisualVMMetrics.png>
>
> Just to verify , JMX port runs on 9010 by default , right ?, because when I
> tried to connect to it locally, I could not connect
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Dealing-with-
> Multiple-sinks-in-Flink-tp8643p10412.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Flink 1.1.3 OOME Permgen

2016-12-05 Thread Robert Metzger
I've submitted Wordcount 410 times to a testing cluster and a streaming job
290 times and I could not reproduce the issue with 1.1.3. Also, the
heapdump of one of the TaskManagers looked pretty normal.

Do you have any ideas how to reproduce the issue?

On Fri, Dec 2, 2016 at 3:21 PM, Robert Metzger  wrote:

> Thank you for reporting the issue Konstantin.
> I've filed a JIRA for the jackson issue: https://issues.apache.o
> rg/jira/browse/FLINK-5233.
> As I said in the JIRA, I propose to upgrade to Jackson 2.7.8, as this
> version contains the fix for the issue, but its not a major jackson upgrade.
>
> Any chance you could try to if 2.7.8 fixes the issue as well?
>
>
> On Fri, Dec 2, 2016 at 11:12 AM, Fabian Hueske  wrote:
>
>> Hi Konstantin,
>>
>> Regarding 2): I've opened FLINK-5227 to update the documentation [1].
>>
>> Regarding the Row type: The Row type was introduced for flink-table and
>> was later used by other modules. There is FLINK-5186 to move Row and all
>> the related TypeInfo (+serializer and comparator) to flink-core [2]. That
>> should solve your issue.
>>
>> Some of the connector modules which provide TableSource and TableSinks
>> have dependencies on flink-table as well. I'll check that these are
>> optional dependencies to avoid that we pull in Calcite through connectors
>> for jobs that do not not need it.
>>
>> Thanks,
>> Fabian
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-5227
>> [2] https://issues.apache.org/jira/browse/FLINK-5186
>>
>> 2016-11-30 17:51 GMT+01:00 Konstantin Knauf > >:
>>
>>> Hi Stefan,
>>>
>>> unfortunately, I can not share any heap dumps with you. I was able to
>>> resolve some of the issues my self today, the root causes were different
>>> for different jobs.
>>>
>>> 1) Jackson 2.7.2 (which comes with Flink) has a known class loading
>>> issue (see https://github.com/FasterXML/jackson-databind/issues/1363).
>>> Shipping a shaded version of Jackson 2.8.4 with our user code helped. I
>>> recommend upgrading Flink's Jackson version soon.
>>>
>>> 2) We have a dependency on the flink-table [1] , which ships with
>>> Calcite including the Calcite JDBC Driver, which can not been collected
>>> cause of the known problem with the java.sql.DriverManager. Putting the
>>> flink-table in Flink's lib dir instead of shipping it with the user code
>>> helps. You should update the documentation, because this will always
>>> happen when using flink-table, I think. So I wonder, why this has not
>>> come up before actually.
>>>
>>> 3) Unresolved: Some Threads in a custom source which are not proberly
>>> shut down and keep references to the UserCodeClassLoader. I did not have
>>> time to look into this issue so far.
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>> [1] Side note: We only need flink-table for the "Row" class used in the
>>> JdbcOutputFormat, so it might make sense to move this class somewhere
>>> else. Naturally, we also tried to exclude the "transitive" dependency on
>>> org.apache.calcite until we noticed that calcite is packaged with
>>> flink-table, so that you can not even exclude it. What is the reasons
>>> for this?
>>>
>>>
>>>
>>>
>>> On 30.11.2016 00:55, Stefan Richter wrote:
>>> > Hi,
>>> >
>>> > could you somehow provide us a heap dump from a TM that run for a
>>> while (ideally, shortly before an OOME)? This would greatly help us to
>>> figure out if there is a classloader leak that causes the problem.
>>> >
>>> > Best,
>>> > Stefan
>>> >
>>> >> Am 29.11.2016 um 18:39 schrieb Konstantin Knauf <
>>> konstantin.kn...@tngtech.com>:
>>> >>
>>> >> Hi everyone,
>>> >>
>>> >> since upgrading to Flink 1.1.3 we observe frequent OOME Permgen
>>> Taskmanager Failures. Monitoring the permgen size on one of the
>>> Taskamanagers you can see that each Job (New Job and Restarts) adds a few
>>> MB, which can not be collected. Eventually, the OOME happens. This happens
>>> with all our Jobs, Streaming and Batch, on Yarn 2.4 as well as Stand-Alone.
>>> >>
>>> >> On Flink 1.0.2 this was not a problem, but I will investigate it
>>> further.
>>> >>
>>> >> The assumption is that Flink is somehow using one of the classes,
>>> which comes with our jar and by that prevents the gc of the whole class
>>> loader. Our Jars do not include any flink dependencies though
>>> (compileOnly), but of course many others.
>>> >>
>>> >> Any ideas anyone?
>>> >>
>>> >> Cheers and thank you,
>>> >>
>>> >> Konstantin
>>> >>
>>> >> sent from my phone. Plz excuse brevity and tpyos.
>>> >> ---
>>> >> Konstantin Knauf *konstantin.kn...@tngtech.com * +49-174-3413182
>>> >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>> >
>>> >
>>>
>>> --
>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. 

Re: In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed

2016-12-05 Thread Robert Metzger
I'll add Aljoscha and Kostas Kloudas to the conversation. They have the
best overview over the changes to the window operator between 1.1. and 1.2.

On Mon, Dec 5, 2016 at 11:33 AM, Yassine MARZOUGUI <
y.marzou...@mindlytix.com> wrote:

> I forgot to mention : the watermark extractor is the one included in Flink
> API.
>
> 2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI :
>
>> Hi robert,
>>
>> Yes, I am using the same code, just swithcing the version in pom.xml to
>> 1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at
>> the time of the question)). Here is the watermark assignment :
>>
>> .assignTimestampsAndWatermarks(new 
>> AscendingTimestampExtractor>()
>> {
>> @Override
>> public long extractAscendingTimestamp(Tuple3
>> tuple3) {
>> return tuple3.f0;
>> }
>> })
>>
>> Best,
>> Yassine
>>
>> 2016-12-05 11:24 GMT+01:00 Robert Metzger :
>>
>>> Hi Yassine,
>>> are you sure your watermark extractor is the same between the two
>>> versions. It sounds a bit like the watermarks for the 1.2 code are not
>>> generated correctly.
>>>
>>> Regards,
>>> Robert
>>>
>>>
>>> On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <
>>> y.marzou...@mindlytix.com> wrote:
>>>
 Hi all,

 With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows
 boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing
 in memory and the windows results are not emitted until the whole stream is
 processed. Is this a temporary behaviour due to the developments in
 1.2-SNAPSHOT, or a bug?

 I am using a code similar to the follwoing:

 env.setParallelism(1);

 DataStream sessions = env
 .readTextFile()
 .flatMap()
 .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
 .keyBy(1)
 .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
 .apply().setParallelism(32)

 sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
 sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();

 Best,
 Yassine

>>>
>>>
>>
>


Re: Resource under-utilization when using RocksDb state backend

2016-12-05 Thread Robert Metzger
Hi Cliff,

which Flink version are you using?
Are you using Eventtime or processing time windows?

I suspect that your disks are "burning" (= your job is IO bound). Can you
check with a tool like "iotop" how much disk IO Flink is producing?
Then, I would set this number in relation with the theoretical maximum of
your SSD's (a good rough estimate is to use dd for that).

If you find that your disk bandwidth is saturated by Flink, you could look
into tuning the RocksDB settings so that it uses more memory for caching.

Regards,
Robert


On Fri, Dec 2, 2016 at 11:34 PM, Cliff Resnick  wrote:

> In tests comparing RocksDb to fs state backend we observe much lower
> throughput, around 10x slower. While the lowered throughput is expected,
> what's perplexing is that machine load is also very low with RocksDb,
> typically falling to  < 25% CPU and negligible IO wait (around 0.1%). Our
> test instances are EC2 c3.xlarge which are 4 virtual CPUs and 7.5G RAM,
> each running a single TaskManager in YARN, with 6.5G allocated memory per
> TaskManager. The instances also have 2x40G attached SSDs which we have
> mapped to `taskmanager.tmp.dir`.
>
> With FS state and 4 slots per TM, we will easily max out with an average
> load average around 5 or 6, so we actually need throttle down the slots to
> 3. With RocksDb using the Flink SSD configured options we see a load
> average at around 1. Also, load (and actual) throughput remain more or less
> constant no matter how many slots we use. The weak load is spread over all
> CPUs.
>
> Here is a sample top:
>
> Cpu0  : 20.5%us,  0.0%sy,  0.0%ni, 79.5%id,  0.0%wa,  0.0%hi,  0.0%si,
>  0.0%st
> Cpu1  : 18.5%us,  0.0%sy,  0.0%ni, 81.5%id,  0.0%wa,  0.0%hi,  0.0%si,
>  0.0%st
> Cpu2  : 11.6%us,  0.7%sy,  0.0%ni, 87.0%id,  0.7%wa,  0.0%hi,  0.0%si,
>  0.0%st
> Cpu3  : 12.5%us,  0.3%sy,  0.0%ni, 86.8%id,  0.0%wa,  0.0%hi,  0.3%si,
>  0.0%st
>
> Our pipeline uses tumbling windows, each with a ValueState keyed to a
> 3-tuple of one string and two ints.. Each ValueState comprises a small set
> of tuples around 5-7 fields each. The WindowFunction simply diffs agains
> the set and updates state if there is a diff.
>
> Any ideas as to what the bottleneck is here? Any suggestions welcomed!
>
> -Cliff
>
>
>
>
>
>
>


Re: JVM Non Heap Memory

2016-12-05 Thread Daniel Santos

Hello,

I have done some threads checking and dumps. And I have disabled the 
checkpointing.


Here are my findings.

I did a thread dump a few hours after I booted up the whole cluster. 
(@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit )


The dump shows that most threads are of 3 sources.
*
**OutputFlusher --- 634 -- Sleeping State*

"OutputFlusher" - Thread t@4758
   java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164)


   Locked ownable synchronizers:
- None
*
**Metrics --- 376 ( Flink Metrics Reporter it's the only metrics being 
used ) -- Parked State*


"metrics-meter-tick-thread-1" - Thread t@29024
   java.lang.Thread.State: TIMED_WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
- None
*
*

*tend -- 220 ( Aerospike Client Thread ) -- Sleeping State
*

 "tend" - Thread t@29011
   java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at com.aerospike.client.util.Util.sleep(Util.java:38)
at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
- None


I have 2 streaming jobs and a batch Job that runs once in a while.

Streaming job A runs with a parallel of 2 and runs Aerospike only in 
RichSink .


Streaming job B runs with a parallel of 24 and runs Aerospike in 
RichFilterFunction / RichMapFunction with open and close methods, in 
order to open and close the client.


Batch Job runs Aerospike Client in RichFilterFunction / RichMapFunction 
with open and close methods in order to open and close the client.


Next thing I cancelled all the streaming jobs @5/12/2016 and checked the 
threads and the JVM non-heap usage.


JVM non-heap usage reaches 3GB, threads go down, but some still linger 
around and they are the following.


*Metrics --- 790 ( Flink Metrics Reporter it's the only metrics being 
used ) *


"metrics-meter-tick-thread-1" - Thread t@29024
   java.lang.Thread.State: TIMED_WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
- None

*
*

*tend -- 432**( Aerospike Client Thread )*


 "tend" - Thread t@29011
   java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at com.aerospike.client.util.Util.sleep(Util.java:38)
at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
- None


Total number threads are 1289 ( total ) / 1220 ( tend + metrics ) . So I 
have 1220 threads that I believe that sould be dead and not running, 
since I have no jobs running at all.


And the JVM Non-HEAP usage doesn't decreases at all, after removing 
every job.



Why the hell metrics grow to no end ?

I am using the following libs for metrics :

- metrics-graphite-3.1.0.jar

- metrics-core-3.1.0.jar

- flink-metrics-dropwizard-1.1.3.jar

- flink-metrics-graphite-1.1.3.jar

And the config for reporter is :

metrics.reporters: 

Re: Flink CEP dynamic patterns

2016-12-05 Thread Abdallah Ghdiri
thank you, i will investigate further

On Mon, Dec 5, 2016 at 10:36 AM, Till Rohrmann 
wrote:

> Hi Abdallah,
>
> I've answered your question on SO. For the sake of completeness here is a
> copy:
>
> At the moment Flink's CEP library does not support dynamic pattern changes
> out of the box. Thus, once you've defined your pattern and started your
> job, it will only process this defined pattern.
>
> However, you can write your own operator implementing the
> TwoInputStreamOperator interface which receives on one input pattern
> definitions and on the other input the stream records (similar to a
> CoFlatMap function). For every new pattern you would then have to compile a
> new NFA on the operator and feed any new incoming stream elements to this
> NFA as well. That way, you could achieve your intended behaviour.
>
> In the future, we will most likely add this feature to Flink's CEP library.
>
> Cheers,
> Till
>
> On Sun, Dec 4, 2016 at 11:47 PM, Abdallah Ghdiri 
> wrote:
>
>> As suggested by Matthias i am going to post my inquiry to the main
>> thread. Can you please take a look at my question over at stack overflow
>> and see if you have an answer its a quite important aspect of an ongoing
>> project http://stackoverflow.com/questions/40935714/is-it-possible-
>> to-add-new-patterns-in-flink-cep-after-calling-execute
>>
>
>


Re: In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed

2016-12-05 Thread Yassine MARZOUGUI
I forgot to mention : the watermark extractor is the one included in Flink
API.

2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI :

> Hi robert,
>
> Yes, I am using the same code, just swithcing the version in pom.xml to
> 1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at
> the time of the question)). Here is the watermark assignment :
>
> .assignTimestampsAndWatermarks(new 
> AscendingTimestampExtractor>()
> {
> @Override
> public long extractAscendingTimestamp(Tuple3
> tuple3) {
> return tuple3.f0;
> }
> })
>
> Best,
> Yassine
>
> 2016-12-05 11:24 GMT+01:00 Robert Metzger :
>
>> Hi Yassine,
>> are you sure your watermark extractor is the same between the two
>> versions. It sounds a bit like the watermarks for the 1.2 code are not
>> generated correctly.
>>
>> Regards,
>> Robert
>>
>>
>> On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <
>> y.marzou...@mindlytix.com> wrote:
>>
>>> Hi all,
>>>
>>> With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows
>>> boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing
>>> in memory and the windows results are not emitted until the whole stream is
>>> processed. Is this a temporary behaviour due to the developments in
>>> 1.2-SNAPSHOT, or a bug?
>>>
>>> I am using a code similar to the follwoing:
>>>
>>> env.setParallelism(1);
>>>
>>> DataStream sessions = env
>>> .readTextFile()
>>> .flatMap()
>>> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
>>> .keyBy(1)
>>> .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
>>> .apply().setParallelism(32)
>>>
>>> sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
>>> sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();
>>>
>>> Best,
>>> Yassine
>>>
>>
>>
>


Re: In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed

2016-12-05 Thread Yassine MARZOUGUI
Hi robert,

Yes, I am using the same code, just swithcing the version in pom.xml to
1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at
the time of the question)). Here is the watermark assignment :

.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor>() {
@Override
public long extractAscendingTimestamp(Tuple3
tuple3) {
return tuple3.f0;
}
})

Best,
Yassine

2016-12-05 11:24 GMT+01:00 Robert Metzger :

> Hi Yassine,
> are you sure your watermark extractor is the same between the two
> versions. It sounds a bit like the watermarks for the 1.2 code are not
> generated correctly.
>
> Regards,
> Robert
>
>
> On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <
> y.marzou...@mindlytix.com> wrote:
>
>> Hi all,
>>
>> With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows
>> boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing
>> in memory and the windows results are not emitted until the whole stream is
>> processed. Is this a temporary behaviour due to the developments in
>> 1.2-SNAPSHOT, or a bug?
>>
>> I am using a code similar to the follwoing:
>>
>> env.setParallelism(1);
>>
>> DataStream sessions = env
>> .readTextFile()
>> .flatMap()
>> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
>> .keyBy(1)
>> .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
>> .apply().setParallelism(32)
>>
>> sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
>> sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();
>>
>> Best,
>> Yassine
>>
>
>


Re: separation of JVMs for different applications

2016-12-05 Thread Till Rohrmann
The pro for the multi-tenant cluster mode is that you can share data
between jobs and you don't have to spin up a new cluster for each job. This
might be helpful for scenarios where you want to run many short-lived and
light-weight jobs.

But the important part is that you don't have to use this method. You can
also start a new Flink cluster per job which will then execute the job
isolated from any other jobs (given that you don't submit other jobs to
this cluster).

Cheers,
Till

On Sat, Dec 3, 2016 at 2:50 PM, Manu Zhang  wrote:

> Thanks Fabian and Till.
>
> We have customers who are interested in using Flink but very concerned
> about that "multiple jobs share the same set of TMs". I've just joined the
> community recently so I'm not sure whether there has been a discussion over
> the "multi-tenant cluster mode" before.
>
> The cons are one job/user's failure may crash another, which is
> unacceptable in a multi-tenant scenario.
> What are the pros ? Do the pros overweigh the cons ?
>
> Manu
>
> On Fri, Dec 2, 2016 at 7:06 PM Till Rohrmann  wrote:
>
>> Hi Manu,
>>
>> with Flip-6 we will be able to support stricter application isolation by
>> starting for each job a dedicated JobManager which will execute its tasks
>> on TM reserved solely for this job. But at the same time we will continue
>> supporting the multi-tenant cluster mode where tasks belonging to multiple
>> jobs share the same set of TMs and, thus, might share information between
>> them.
>>
>> Cheers,
>> Till
>>
>> On Fri, Dec 2, 2016 at 11:19 AM, Fabian Hueske  wrote:
>>
>> Hi Manu,
>>
>> As far as I know, there are not plans to change the stand-alone
>> deployment.
>> FLIP-6 is focusing on deployments via resource providers (YARN, Mesos,
>> etc.) which allow to start Flink processes per job.
>>
>> Till (in CC) is more familiar with the FLIP-6 effort and might be able to
>> add more detail.
>>
>> Best,
>> Fabian
>>
>> 2016-12-01 4:16 GMT+01:00 Manu Zhang :
>>
>> Hi all,
>>
>> It seems tasks of different Flink applications can end up in the same JVM
>> (TaskManager) in standalone mode. Isn't this fragile since errors in one
>> application could crash another ? I checked FLIP-6
>>  
>> but
>> didn't found any mention of changing it in the future.
>>
>> Any thoughts or have I missed anything ?
>>
>> Thanks,
>> Manu Zhang
>>
>>
>>
>>


Re: In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed

2016-12-05 Thread Robert Metzger
Hi Yassine,
are you sure your watermark extractor is the same between the two versions.
It sounds a bit like the watermarks for the 1.2 code are not generated
correctly.

Regards,
Robert


On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI  wrote:

> Hi all,
>
> With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows
> boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing
> in memory and the windows results are not emitted until the whole stream is
> processed. Is this a temporary behaviour due to the developments in
> 1.2-SNAPSHOT, or a bug?
>
> I am using a code similar to the follwoing:
>
> env.setParallelism(1);
>
> DataStream sessions = env
> .readTextFile()
> .flatMap()
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
> .keyBy(1)
> .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
> .apply().setParallelism(32)
>
> sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
> sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();
>
> Best,
> Yassine
>


Re: microsecond resolution

2016-12-05 Thread Kostas Kloudas
Hi Jeff,

Actually in Flink timestamps are simple longs.
This means that you can assign anything you want as a timestamp, as long as it 
fits in a long.

Hope this helps and if not, we can discuss to see if we can find a solution 
that 
fits your needs together.

Cheers,
Kostas

> On Dec 4, 2016, at 11:39 PM, jeff jacobson  
> wrote:
> 
> Wow. Really? Is there a way to do micros? A hack? A Jira story? Most (all?) 
> U.S. equity and European futures, options, and stock markets timestamp in 
> microseconds. This makes Flink unusable for a massive industry vertical. To 
> the extent lower-frequency time-series data is being used (e.g. end of data 
> prices), stream processing is kind of overkill. Love everything I've read 
> about Flink...there's got to be a way to make this work, no?
> 
> On Sun, Dec 4, 2016 at 5:27 PM, Matthias J. Sax  > wrote:
> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
> 
> Oh. My bad... Did not read your question carefully enough.
> 
> Than the answer is no, it does not support microseconds (only
> milliseconds).
> 
> - -Matthias
> 
> 
> On 12/4/16 2:22 PM, jeff jacobson wrote:
> > Sorry if I'm missing something. That link mentions milliseconds,
> > no? My question is whether or not I can specify microseconds where
> > 1000microseconds = 1millisecond. Thanks!
> >
> > On Sun, Dec 4, 2016 at 5:05 PM, Matthias J. Sax  > 
> > >> wrote:
> >
> > Yes. It does.
> >
> > See:
> > https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/strea
> ming/event_timestamps_watermarks.html#assigning-timestamps 
> 
> >
> >
>  ing/event_timestamps_watermarks.html#assigning-timestamps 
> >
> >
> > "Both timestamps and watermarks are specified as millliseconds
> > since the Java epoch of 1970-01-01T00:00:00Z."
> >
> >
> >
> > -Matthias
> >
> >
> > On 12/04/2016 10:57 AM, jeff jacobson wrote:
> >> I've sourced stackoverflow, the docs, and the web but I can't
> >> figure out: does flink support microsecond timestamp resolution?
> >> Thanks!
> >
> >
> -BEGIN PGP SIGNATURE-
> Comment: GPGTools - https://gpgtools.org 
> 
> iQIYBAEBCgAGBQJYRJhUAAoJELz8Z8hxAGOiNKoP32ChGeNd7N8Zco2q6lsu+Hxd
> JZq62ey3wTrIUS+3oRlILwnu81cViQHtMMVBly3+YnqB85gNiaEUxEQTQCdKPl8G
> AqxoFIkMcrKGzwGXigKnCAoVIiyuPeNuhY1d1yv4rWrkt7qb0lCC02Xoq1C0hoS6
> Stwk62GXmNRXPYpyjnSq/iAIMbjWaU+ZU0t4V3J8loroNuJ5QcUsJLfRXeo3/5ho
> f42L+IANyB5K7vnTxNZYyf5ShNVbTY9/iFaviluxrCNztqGTo7CxMpcyWyMS3wcF
> ycXcq/daB+guEJpW0sm4JtMPSsQ/kN99c/ig3t0HX1kDV7xrDDSF2qPvbYOWF38n
> omTr7RY3YRFi5LOKvBGa96Aw5UYjMddjcqozWId6xgdXfvz6RUeJCWa9RW8I6ptg
> 8TaJpM2WgDJMgMuzdl8dDv65l78DkLlNlNo53O66b/9Pt78P75KNjj8naD5kkj4C
> i9amwnUNNEnZucA2/1vhzr6cVSzrzBLL7juVj0VmABZo4itUZjjR0UkN7MB+ioWU
> trNhaXgE6EP/160n6D0/NUu02prm3jq8mK6gu9lZFWGbAeCUcch+CbvWSaiXAw3H
> BOieCsgZD1wfXQJ3wEmnqj/YP94uDlx1IjynskDevjk6OIyIysbBSIqgsUK6fvQ8
> ztXO6ls7ARMOBmA=
> =/O+Q
> -END PGP SIGNATURE-
> 



Re: Flink CEP dynamic patterns

2016-12-05 Thread Till Rohrmann
Hi Abdallah,

I've answered your question on SO. For the sake of completeness here is a
copy:

At the moment Flink's CEP library does not support dynamic pattern changes
out of the box. Thus, once you've defined your pattern and started your
job, it will only process this defined pattern.

However, you can write your own operator implementing the
TwoInputStreamOperator interface which receives on one input pattern
definitions and on the other input the stream records (similar to a
CoFlatMap function). For every new pattern you would then have to compile a
new NFA on the operator and feed any new incoming stream elements to this
NFA as well. That way, you could achieve your intended behaviour.

In the future, we will most likely add this feature to Flink's CEP library.

Cheers,
Till

On Sun, Dec 4, 2016 at 11:47 PM, Abdallah Ghdiri 
wrote:

> As suggested by Matthias i am going to post my inquiry to the main thread.
> Can you please take a look at my question over at stack overflow and see if
> you have an answer its a quite important aspect of an ongoing project
> http://stackoverflow.com/questions/40935714/is-it-
> possible-to-add-new-patterns-in-flink-cep-after-calling-execute
>


Re: Update avro to 1.7.7 or later for flink 1.1.4

2016-12-05 Thread Timo Walther

Hi David,

thanks for looking into this. Since you already looked into this issue 
and solved/tested your fix, it would be great if you could open a pull 
request for it. Every contribution is very welcome.


Regards,
Timo


Am 04/12/16 um 23:35 schrieb Torok, David:


I spent close to two days and tracked down the solution to a major 
issue with Avro / GenericRecord and Flink.  In short, there is a field 
marked ‘transient’ in Avro 1.7.6 and earlier which interferes with 
correct Kryo serialization.  This was fixed in Avro 1.7.7, but Flink 
is dependent on Avro 1.7.6 in its POM.xml file.


I recorded the root cause and solution in JIRA 
https://issues.apache.org/jira/browse/FLINK-5039 



However the issue is marked as ‘minor’ I’d like a little more 
attention and hopefully the Avro version can be updated for Flink 1.1.4?


In the meantime, I have created a custom Flink distribution Jar 
containing the Avro 1.7.7 classfiles and it is working perfectly for 
me now.


Best Regards,

Dave Torok