Re: flink/cancel & shutdown hooks

2017-03-08 Thread Timo Walther

Hi Dominik,

did you take a look into the logs? Maybe the exception is not shown in 
the CLI but in the logs.


Timo

Am 07/03/17 um 23:58 schrieb Dominik Safaric:

Hi all,

I would appreciate for any help or advice in regard to default Java runtime 
shutdown hooks and canceling Flink jobs.

Namely part of my Flink application I am using a Kafka interceptor class that defines 
a shutdown hook thread. When stopping the Flink streaming job on my local machine the 
shutdown hook gets executed, however I do not see the same behaviour when stopping 
the Flink application using bin/flink cancel .

Considering there are no exceptions thrown from the shutdown thread, what could 
the root cause of this be?

Thanks,
Dominik





Re: Appropriate State to use to buffer events in ProcessFunction

2017-03-08 Thread Timo Walther

Hi Yassine,

have you thought about using a ListState? As far as I know, it keeps at 
least the insertion order. You could sort it once your trigger event has 
arrived.
If you use a RocksDB as state backend, 100+ GB of state should not be a 
problem. Have you thought about using Flink's CEP library? It might fit 
to your needs without implementing a custom process function.


I hope that helps.

Timo


Am 07/03/17 um 19:23 schrieb Yassine MARZOUGUI:

Hi all,

I want to label events in a stream based on a condition on some future 
events.
For example my stream contains events of type A and B and and I would 
like to assign a label 1 to an event E of type A if an event of type B 
happens within a duration x of E. I am using event time and my events 
can be out of order.
For this I'm using ProcessFunction which looks suitable for my use 
case. In order to handle out of order events, I'm keeping events of 
type A in a state and once an event of type B is received, I fire an 
event time timer in which I loop through events of type A in the state 
having a timestamps < timer.timestamp, label them and remove them from 
the state.
Currently the state is simply a value state containing a 
TreeMap. I'm keeping events sorted in order to 
effectively get events older than the timer timestamp.
I wonder If that's the appropriate data structure to use in the value 
state to buffer events and be able to handle out of orderness, or if 
there is a more effective implementation, especially that the state 
may grow to reach ~100 GB sometimes?


Any insight is appreciated.

Thanks,
Yassine







Re: window function not working when control stream broadcast

2017-03-08 Thread Timo Walther

Hi Sam,

could you explain the behavior a bit more? How does the window function 
behave? Is it not triggered or what is the content? What is the result 
if you don't use a window function?


Timo


Am 08/03/17 um 02:59 schrieb Sam Huang:

btw, the reduce function works well, I've printed out the data, and they are
all correct. So are the timestamps and watermarks. And if I remove
".broadcast()", the data is successfully sinked.

Any help?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/window-function-not-working-when-control-stream-broadcast-tp12093p12094.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





Re: Integrate Flink with S3 on EMR cluster

2017-03-08 Thread vinay patil
Hi ,

@Shannon - I am not facing any issue while writing to S3, was getting
NoClassDef errors when reading the file from S3.

''Hadoop File System" - I mean I am using FileSystem class of Hadoop to read
the file from S3.

@Stephan - I tried with 1.1.4 , was getting the same issue.

The easiest way I found is to run " hadoop classpath " command, and paste
its value for export HADOOP_CLASSPATH variable.

This way we don't have to copy any S3 specific jars to Flink lib folder.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Integrate-Flink-with-S3-on-EMR-cluster-tp5894p12101.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink checkpointing gets stuck

2017-03-08 Thread Ufuk Celebi
Added this JIRA https://issues.apache.org/jira/browse/FLINK-5993 to
track this. Would be great to comment there if you have any other
issues that should be covered in a Azure deployment section.


On Tue, Mar 7, 2017 at 7:10 PM, Stephan Ewen  wrote:
> Great to hear it!
>
> What do you think about adding a section to the Flink docs about deployment
> on Azure (there is already AWS and GCE, so Azure would make the
> cloud-trinity complete) that explains how to set this up and avoid such
> pitfalls.
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>
> If there is anything you think can be improved in the dependencies to help
> there, please let us know!
>
>
> On Tue, Mar 7, 2017 at 5:31 PM, Avihai Berkovitz
>  wrote:
>>
>> For anyone seeing this thread in the future, we managed to solve the
>> issue. The problem was in the Azure storage SDK.
>>
>> Flink is using Hadoop 2.7, so we used version 2.7.3 of the Hadoop-azure
>> package. This package uses version 2.0.0 of the azure-storage package, dated
>> from 2014. It has several bugs that were since fixed, specifically one where
>> the socket timeout was infinite. We updated this package to version 5.0.0
>> and everything is working smoothly now.
>>
>>
>>
>> From: Stephan Ewen [mailto:se...@apache.org]
>> Sent: Sunday, February 26, 2017 4:47 PM
>> To: user@flink.apache.org
>> Subject: Re: Flink checkpointing gets stuck
>>
>>
>>
>> Thanks!
>>
>>
>>
>> This looks like a bigger example, involving MongoDB, etc.
>>
>> Are you able to reproduce this issue with a smaller example?
>>
>>
>>
>> It would also help to understand the problem better if we knew the
>> topology a bit better.
>>
>> The stack traces look like "phase 1&2" want to send data (but are back
>> pressured) and "phase 3&4&5" wait for input data.
>>
>>
>>
>> Stephan
>>
>>
>>
>>
>>
>> On Sun, Feb 26, 2017 at 12:30 PM, Shai Kaplan 
>> wrote:
>>
>> Running jstack on one of the Task Managers:
>>
>>
>>
>> 2017-02-26 10:06:27
>>
>> Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.111-b14 mixed
>> mode):
>>
>>
>>
>> "Attach Listener" #6414 daemon prio=9 os_prio=0 tid=0x7f3c8c089000
>> nid=0xe692 waiting on condition [0x]
>>
>>java.lang.Thread.State: RUNNABLE
>>
>>
>>
>> "Async calls on Sink: phase 5 (32/48)" #2337 daemon prio=5 os_prio=0
>> tid=0x7f3b942fc000 nid=0xb0d5 waiting on condition [0x7f3adf0af000]
>>
>>java.lang.Thread.State: WAITING (parking)
>>
>>   at sun.misc.Unsafe.park(Native Method)
>>
>>   - parking to wait for  <0x7f3d9d000620> (a
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>
>>   at
>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>
>>   at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>>
>>   at
>> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
>>
>>   at
>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
>>
>>   at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
>>
>>   at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>
>>   at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>> "Async calls on Sink: phase 5 (31/48)" #2336 daemon prio=5 os_prio=0
>> tid=0x7f3b942fb000 nid=0xb0d4 waiting on condition [0x7f3adf1b]
>>
>>java.lang.Thread.State: WAITING (parking)
>>
>>   at sun.misc.Unsafe.park(Native Method)
>>
>>   - parking to wait for  <0x7f3d9fbd7e70> (a
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>
>>   at
>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>
>>   at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>>
>>   at
>> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
>>
>>   at
>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
>>
>>   at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
>>
>>   at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>
>>   at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>> "Async calls on Sink: phase 5 (30/48)" #2335 daemon prio=5 os_prio=0
>> tid=0x7f3b942f9800 nid=0xb0d3 waiting on condition [0x7f3adf2b1000]
>>
>>java.lang.Thread.State: WAITING (parking)
>>
>>   at sun.misc.Unsafe.park(Native Method)
>>
>>   - parking to wait for  <0x7f3da07cdde8> (a
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>
>>   at
>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>
>>   

Starting flink HA cluster with start-cluster.sh

2017-03-08 Thread Dawid Wysakowicz
Hi,

I've tried to start cluster with HA mode as described in the doc, but with
a current state of bin/config.sh I failed.

I think there is a bug with configuring the HIGH_AVAILABILITY variable in
block (bin/config.sh):

if [ -z "${HIGH_AVAILABILITY}" ]; then
 HIGH_AVAILABILITY=$(readFromConfig ${KEY_HIGH_AVAILABILITY} ""
"${YAML_CONF}")
 if [ -z "${HIGH_AVAILABILITY}" ]; then
# Try deprecated value
DEPRECATED_HA=$(readFromConfig "recovery.mode" "" "${YAML_CONF}")
if [ -z "${DEPRECATED_HA}" ]; then
HIGH_AVAILABILITY="none"
elif [ ${DEPRECATED_HA} == "standalone" ]; then
# Standalone is now 'none'
HIGH_AVAILABILITY="none"
else
HIGH_AVAILABILITY=${DEPRECATED_HA}
fi
 else
 HIGH_AVAILABILITY="none"
 fi
fi

if value "zookeeper" is read from config file the variable will be reset to
"none" with the else branch.

I just want to confirm it is a bug before filing a JIRA.

Regards
Dawid


Isolate Tasks - Run Distinct Tasks in Different Task Managers

2017-03-08 Thread PedroMrChaves
Hello,

Assuming that I have the following Job Graph,
   
(Source) -> (map) -> (KeyBy | Window | apply) -> (Sink)

Is there a way to assure that the map operator (and all its subtasks) run on
a different
task manager than the operator (map | window | apply)?

This would allow JVM memory isolation without using YARN.

Regards,
Pedro





-
Best Regards,
Pedro Chaves
--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Isolate-Tasks-Run-Distinct-Tasks-in-Different-Task-Managers-tp12104.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink 1.2 Proper Packaging of flink-table with SBT

2017-03-08 Thread Timo Walther

Hi Justin,

thank you for reporting your issues. I never tried the Table API with 
SBT but `flink-table` should not declare dependencies to core modules, 
this is only done in `test` scope, maybe you have to specify the right 
scope manually? You are right, the mentioned Jira should be fixed asap, 
I added it to my personal TODO list. Regarding the missing Janino 
artifacts in the Jar file, I created a Jira issue 
(https://issues.apache.org/jira/browse/FLINK-5994). This is very strange 
as it actually a dependency of flink-table.


Thanks again for the feedback. If you experience any further issues with 
the Table API feel free to post them here.


Regards,
Timo


Am 08/03/17 um 04:50 schrieb Justin Yan:
Of course, 15 minutes after I give up and decide to email the mailing 
list, I figure it out - my flink App was using the 
CollectionsEnvironment and not the proper RemoteEnvironment.


It is still the case, however, that the `flink-table` JAR built by the 
standard commands doesn't include the dependencies it requires, and so 
I'd be curious to hear what the proper procedure is for linking 
against `flink-table` if you want to avoid the bug I highlighted in 
the aforementioned JIRA.


Thank you and sorry for the extra noise!

Justin

On Tue, Mar 7, 2017 at 7:21 PM, Justin Yan > wrote:


Hello!

We are attempting to use the Flink Table API, but are running into
a few issues.

We initially started with our dependencies looking something like
this:

libraryDependencies ++=Seq(
"org.apache.flink" %%"flink-scala" %"1.2.0" %"provided", "org.apache.flink" %%"flink-clients" %"1.2.0" 
%"provided", "org.apache.flink" %%"flink-table" %"1.2.0", Libraries.specs2, ...)

However, this is mildly annoying since flink-table declares
dependencies on the flink core modules, and thus brings everything
in /anyway/.  On top of that, we saw this JIRA:
https://issues.apache.org/jira/browse/FLINK-5227
, which we found
concerning, so we decided to follow the advice - we downloaded and
built Flink-1.2 from source (java 7, mvn 3.3) using the following
(We're also using the Kinesis connector):

tools/change-scala-version.sh 2.11 mvn clean install
-Pinclude-kinesis -DskipTests cd flink-dist mvn clean install
-Pinclude-kinesis -DskipTests

Once this was done, we took the JAR in
"/flink-libraries/flink-table/target/" and copied it over to the
taskManager "/lib" directory. Finally, we marked our `flink-table`
dependency as "provided".  Everything compiles, but when I try to
actually run a simple job, I get the following error at runtime:

java.lang.NoClassDefFoundError:
org/codehaus/commons/compiler/CompileException

Indeed, when I peek inside of the `flink-table` JAR, I can't find
that particular package (and similarly, it isn't in the flink-dist
JAR either)

$ jar tf flink-table_2.11-1.2.0.jar | grep codehaus
$

I then attempted to include this library in my user code by adding:

"org.codehaus.janino" %"janino" %"3.0.6",

to my list of dependencies.  When I run a `jar tf myjar.jar | grep
CompileException` - I see the class. However, when I run my flink
application in this fat JAR, I /continue to get the same error/,
even though I am positive this class is included in the fat JAR. 
I eventually got around this by placing this jar in the

`flink/lib` directory, but I am very confused as to how this class
cannot be found when I have included it in the fat JAR that I am
submitting with the Flink CLI to a YARN cluster.  I mostly wanted
to mention this in case it is a bug, but mostly to see if anyone
else has had trouble with the Table API, and if not, if I have
structured my project incorrectly to cause these troubles.

Thanks!

Justin








Remove Accumulators at runtime

2017-03-08 Thread PedroMrChaves
Hello,

We can add an accumulator using the following call:
getRuntimeContext().addAccumulator(NAME, ACCUMULATOR);

Is there a way to remove the added accumulators at runtime? 

Regards,
Pedro Chaves



-
Best Regards,
Pedro Chaves
--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Remove-Accumulators-at-runtime-tp12106.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Starting flink HA cluster with start-cluster.sh

2017-03-08 Thread Ufuk Celebi
Shouldn't the else branch

```
else
   HIGH_AVAILABILITY=${DEPRECATED_HA}
fi
```

set it to `zookeeper`? Of course, the truth is whatever the script
execution prints out. ;-)

PS Emails like this should either go to the dev list or it's also fine
to open an issue and discuss there (and potentially close as Not a
Problem if it is not an issue after all).



On Wed, Mar 8, 2017 at 10:37 AM, Dawid Wysakowicz
 wrote:
> Hi,
>
> I've tried to start cluster with HA mode as described in the doc, but with a
> current state of bin/config.sh I failed.
>
> I think there is a bug with configuring the HIGH_AVAILABILITY variable in
> block (bin/config.sh):
>
> if [ -z "${HIGH_AVAILABILITY}" ]; then
>  HIGH_AVAILABILITY=$(readFromConfig ${KEY_HIGH_AVAILABILITY} ""
> "${YAML_CONF}")
>  if [ -z "${HIGH_AVAILABILITY}" ]; then
> # Try deprecated value
> DEPRECATED_HA=$(readFromConfig "recovery.mode" "" "${YAML_CONF}")
> if [ -z "${DEPRECATED_HA}" ]; then
> HIGH_AVAILABILITY="none"
> elif [ ${DEPRECATED_HA} == "standalone" ]; then
> # Standalone is now 'none'
> HIGH_AVAILABILITY="none"
> else
> HIGH_AVAILABILITY=${DEPRECATED_HA}
> fi
>  else
>  HIGH_AVAILABILITY="none"
>  fi
> fi
>
> if value "zookeeper" is read from config file the variable will be reset to
> "none" with the else branch.
>
> I just want to confirm it is a bug before filing a JIRA.
>
> Regards
> Dawid


Re: Remove Accumulators at runtime

2017-03-08 Thread Ufuk Celebi
Hey Pedro! No, this is not possible. What your use case for this?


On Wed, Mar 8, 2017 at 10:52 AM, PedroMrChaves
 wrote:
> Hello,
>
> We can add an accumulator using the following call:
> getRuntimeContext().addAccumulator(NAME, ACCUMULATOR);
>
> Is there a way to remove the added accumulators at runtime?
>
> Regards,
> Pedro Chaves
>
>
>
> -
> Best Regards,
> Pedro Chaves
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Remove-Accumulators-at-runtime-tp12106.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.


Re: AWS exception serialization problem

2017-03-08 Thread Tzu-Li (Gordon) Tai
Hi Shannon,

Just to clarify:

From the error trace, it seems like that the messages fetched from Kafka are 
serialized `AmazonS3Exception`s, and you’re emitting a stream of 
`AmazonS3Exception` as records from FlinkKafkaConsumer?
Is this correct? If so, I think we should just make sure that the 
`com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the user 
fat jar.

Also, what is the Flink version you are using?

Cheers,
Gordon

Re: AWS exception serialization problem

2017-03-08 Thread Bruno Aranda
Hi,

We have seen something similar in Flink 1.2. We have an operation that
parses some JSON, and when it fails to parse it, we can see the
ClassNotFoundException for the relevant exception (in our case
JsResultException from the play-json library). The library is indeed in the
shaded JAR, otherwise we would not be able to parse the JSON.

Cheers,

Bruno

On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai  wrote:

> Hi Shannon,
>
> Just to clarify:
>
> From the error trace, it seems like that the messages fetched from Kafka
> are serialized `AmazonS3Exception`s, and you’re emitting a stream of
> `AmazonS3Exception` as records from FlinkKafkaConsumer?
> Is this correct? If so, I think we should just make sure that the
> `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the
> user fat jar.
>
> Also, what is the Flink version you are using?
>
> Cheers,
> Gordon
>


Re: Isolate Tasks - Run Distinct Tasks in Different Task Managers

2017-03-08 Thread Ufuk Celebi
Internally, Flink defines through SlotSharingGroup which tasks may
share a task manager slot. By configuring each TaskManager to have a
single slot and configuring the slot sharing groups accordingly, you
can get the desired behaviour.

You can specify the slot sharing group for an operator like map by
calling slotSharingGroup(String). You would have you set a different
slot sharing group for the window/apply part.

Does this help?


On Wed, Mar 8, 2017 at 10:44 AM, PedroMrChaves
 wrote:
> Hello,
>
> Assuming that I have the following Job Graph,
>
> (Source) -> (map) -> (KeyBy | Window | apply) -> (Sink)
>
> Is there a way to assure that the map operator (and all its subtasks) run on
> a different
> task manager than the operator (map | window | apply)?
>
> This would allow JVM memory isolation without using YARN.
>
> Regards,
> Pedro
>
>
>
>
>
> -
> Best Regards,
> Pedro Chaves
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Isolate-Tasks-Run-Distinct-Tasks-in-Different-Task-Managers-tp12104.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.


Re: TTL for State Entries / FLINK-3089

2017-03-08 Thread Ufuk Celebi
Looping in Aljoscha and Kostas who are the expert on this. :-)

On Mon, Mar 6, 2017 at 6:06 PM, Johannes Schulte
 wrote:
> Hi,
>
> I am trying to achieve a stream-to-stream join with big windows and are
> searching for a way to clean up state of old keys. I am already using a
> RichCoProcessFunction
>
> I found there is already an existing ticket
>
> https://issues.apache.org/jira/browse/FLINK-3089
>
> but I have doubts that a registration of a timer for every incoming event is
> feasible as the timers seem to reside in an in-memory queue.
>
> The task is somewhat similar to the following blog post:
> http://devblog.mediamath.com/real-time-streaming-attribution-using-apache-flink
>
> Is the implementation of a custom window operator a necessity for achieving
> such functionality
>
> Thanks a lot,
>
> Johannes
>
>


Re: flink/cancel & shutdown hooks

2017-03-08 Thread Dominik Safaric
I’m deploying the job from the master node of the cluster itself using 
bin/flink run -c   . 

The cluster consists of 4 workers and a master node. 

Dominik

> On 8 Mar 2017, at 15:16, Ufuk Celebi  wrote:
> 
> How are you deploying your job?
> 
> Shutdown hooks are executed when the JVM terminates whereas the cancel
> command only cancels the Flink job and the JVM process potentially
> keeps running. For example, running a standalone cluster would keep
> the JVMs running.
> 
> On Wed, Mar 8, 2017 at 9:36 AM, Timo Walther  wrote:
>> Hi Dominik,
>> 
>> did you take a look into the logs? Maybe the exception is not shown in the
>> CLI but in the logs.
>> 
>> Timo
>> 
>> Am 07/03/17 um 23:58 schrieb Dominik Safaric:
>> 
>>> Hi all,
>>> 
>>> I would appreciate for any help or advice in regard to default Java
>>> runtime shutdown hooks and canceling Flink jobs.
>>> 
>>> Namely part of my Flink application I am using a Kafka interceptor class
>>> that defines a shutdown hook thread. When stopping the Flink streaming job
>>> on my local machine the shutdown hook gets executed, however I do not see
>>> the same behaviour when stopping the Flink application using bin/flink
>>> cancel .
>>> 
>>> Considering there are no exceptions thrown from the shutdown thread, what
>>> could the root cause of this be?
>>> 
>>> Thanks,
>>> Dominik
>> 
>> 
>> 



Re: flink/cancel & shutdown hooks

2017-03-08 Thread Ufuk Celebi
How are you deploying your job?

Shutdown hooks are executed when the JVM terminates whereas the cancel
command only cancels the Flink job and the JVM process potentially
keeps running. For example, running a standalone cluster would keep
the JVMs running.

On Wed, Mar 8, 2017 at 9:36 AM, Timo Walther  wrote:
> Hi Dominik,
>
> did you take a look into the logs? Maybe the exception is not shown in the
> CLI but in the logs.
>
> Timo
>
> Am 07/03/17 um 23:58 schrieb Dominik Safaric:
>
>> Hi all,
>>
>> I would appreciate for any help or advice in regard to default Java
>> runtime shutdown hooks and canceling Flink jobs.
>>
>> Namely part of my Flink application I am using a Kafka interceptor class
>> that defines a shutdown hook thread. When stopping the Flink streaming job
>> on my local machine the shutdown hook gets executed, however I do not see
>> the same behaviour when stopping the Flink application using bin/flink
>> cancel .
>>
>> Considering there are no exceptions thrown from the shutdown thread, what
>> could the root cause of this be?
>>
>> Thanks,
>> Dominik
>
>
>


Re: flink/cancel & shutdown hooks

2017-03-08 Thread Ufuk Celebi
On Wed, Mar 8, 2017 at 3:19 PM, Dominik Safaric
 wrote:
> The cluster consists of 4 workers and a master node.

Are you starting the cluster via bin/start-cluster.sh or are you using
YARN etc.?


Re: flink/cancel & shutdown hooks

2017-03-08 Thread Dominik Safaric
I’m not using YARN but instead of starting the cluster using 
bin/start-cluster.sh 

> On 8 Mar 2017, at 15:32, Ufuk Celebi  wrote:
> 
> On Wed, Mar 8, 2017 at 3:19 PM, Dominik Safaric
>  wrote:
>> The cluster consists of 4 workers and a master node.
> 
> Are you starting the cluster via bin/start-cluster.sh or are you using
> YARN etc.?



Re: flink/cancel & shutdown hooks

2017-03-08 Thread Ufuk Celebi
OK, the thing is that the JVMs are not shut down when you cancel the
task. Therefore no shut down hook is executed when you cancel.

You would have to execute bin/stop-cluster.sh to stop the JVM. Does
that make sense?


On Wed, Mar 8, 2017 at 3:34 PM, Dominik Safaric
 wrote:
> I’m not using YARN but instead of starting the cluster using 
> bin/start-cluster.sh
>
>> On 8 Mar 2017, at 15:32, Ufuk Celebi  wrote:
>>
>> On Wed, Mar 8, 2017 at 3:19 PM, Dominik Safaric
>>  wrote:
>>> The cluster consists of 4 workers and a master node.
>>
>> Are you starting the cluster via bin/start-cluster.sh or are you using
>> YARN etc.?
>


Re: Isolate Tasks - Run Distinct Tasks in Different Task Managers

2017-03-08 Thread PedroMrChaves
Thanks for the response.

I would like to assure that the map operator is not in the same task manager
as the window/apply operator, regardless of the number of slots of each task
manager. 



-
Best Regards,
Pedro Chaves
--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Isolate-Tasks-Run-Distinct-Tasks-in-Different-Task-Managers-tp12104p12118.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Remove Accumulators at runtime

2017-03-08 Thread PedroMrChaves
Hi,

I'm building a system that maintains a set of rules that can be dynamically
added/removed. I wanted to count every element that matched each rule in an
accumulator ( I have several parallel instances). If the rule is removed so
should the accumulator.





-
Best Regards,
Pedro Chaves
--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Remove-Accumulators-at-runtime-tp12106p12119.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink 1.2 Proper Packaging of flink-table with SBT

2017-03-08 Thread Justin Yan
Hi Timo,

Regarding the dependency issue, looking at flink-table's pom.xml, I believe
the issue is the dependency on flink-streaming-scala, which then
transitively depends on almost all of the core flink modules.  If it had
not been for the aforementioned JIRA issue about OOM errors, I probably
would've just declared my dependency in SBT like this and been done with it:

"org.apache.flink" %% "flink-table" % "1.2.0" exclude("org.apache.flink",
"flink-streaming-scala"),

As for the missing Janino artifacts, I agree it is strange as the
dependency is declared - and the JAR contained the calcite dependencies.  I
basically followed the standard "build from source" steps (as documented
here
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/building.html),
and copied the flink-table JAR that was in the target/ directory.  If there
is trouble reproducing, I'm happy to provide a more detailed set-up.

Thanks!

Justin





On Wed, Mar 8, 2017 at 1:53 AM, Timo Walther  wrote:

> Hi Justin,
>
> thank you for reporting your issues. I never tried the Table API with SBT
> but `flink-table` should not declare dependencies to core modules, this is
> only done in `test` scope, maybe you have to specify the right scope
> manually? You are right, the mentioned Jira should be fixed asap, I added
> it to my personal TODO list. Regarding the missing Janino artifacts in the
> Jar file, I created a Jira issue (https://issues.apache.org/
> jira/browse/FLINK-5994). This is very strange as it actually a dependency
> of flink-table.
>
> Thanks again for the feedback. If you experience any further issues with
> the Table API feel free to post them here.
>
> Regards,
> Timo
>
>
> Am 08/03/17 um 04:50 schrieb Justin Yan:
>
> Of course, 15 minutes after I give up and decide to email the mailing
> list, I figure it out - my flink App was using the CollectionsEnvironment
> and not the proper RemoteEnvironment.
>
> It is still the case, however, that the `flink-table` JAR built by the
> standard commands doesn't include the dependencies it requires, and so I'd
> be curious to hear what the proper procedure is for linking against
> `flink-table` if you want to avoid the bug I highlighted in the
> aforementioned JIRA.
>
> Thank you and sorry for the extra noise!
>
> Justin
>
> On Tue, Mar 7, 2017 at 7:21 PM, Justin Yan  wrote:
>
>> Hello!
>>
>> We are attempting to use the Flink Table API, but are running into a few
>> issues.
>>
>> We initially started with our dependencies looking something like this:
>>
>> libraryDependencies ++= Seq(  "org.apache.flink" %% "flink-scala" % "1.2.0" 
>> % "provided",  "org.apache.flink" %% "flink-clients" % "1.2.0" % "provided", 
>>  "org.apache.flink" %% "flink-table" % "1.2.0",  Libraries.specs2,  ...)
>>
>> However, this is mildly annoying since flink-table declares dependencies
>> on the flink core modules, and thus brings everything in *anyway*.  On
>> top of that, we saw this JIRA: https://issues.apache.or
>> g/jira/browse/FLINK-5227, which we found concerning, so we decided to
>> follow the advice - we downloaded and built Flink-1.2 from source (java 7,
>> mvn 3.3) using the following (We're also using the Kinesis connector):
>>
>> tools/change-scala-version.sh 2.11mvn clean install -Pinclude-kinesis 
>> -DskipTestscd flink-distmvn clean install -Pinclude-kinesis -DskipTests
>>
>> Once this was done, we took the JAR in "/flink-libraries/flink-table/target/"
>> and copied it over to the taskManager "/lib" directory.  Finally, we marked
>> our `flink-table` dependency as "provided".  Everything compiles, but when
>> I try to actually run a simple job, I get the following error at runtime:
>>
>> java.lang.NoClassDefFoundError: org/codehaus/commons/compiler/
>> CompileException
>>
>> Indeed, when I peek inside of the `flink-table` JAR, I can't find that
>> particular package (and similarly, it isn't in the flink-dist JAR either)
>>
>> $ jar tf flink-table_2.11-1.2.0.jar | grep codehaus
>> $
>>
>> I then attempted to include this library in my user code by adding:
>>
>> "org.codehaus.janino" % "janino" % "3.0.6",
>>
>> to my list of dependencies.  When I run a `jar tf myjar.jar | grep
>> CompileException` - I see the class. However, when I run my flink
>> application in this fat JAR, I *continue to get the same error*, even
>> though I am positive this class is included in the fat JAR.  I eventually
>> got around this by placing this jar in the `flink/lib` directory, but I am
>> very confused as to how this class cannot be found when I have included it
>> in the fat JAR that I am submitting with the Flink CLI to a YARN cluster.
>> I mostly wanted to mention this in case it is a bug, but mostly to see if
>> anyone else has had trouble with the Table API, and if not, if I have
>> structured my project incorrectly to cause these troubles.
>>
>> Thanks!
>>
>> Justin
>>
>>
>>
>>
>
>


Re: window function not working when control stream broadcast

2017-03-08 Thread Sam Huang
Hi Timo,

The window function sinks the data into InfluxDB, and it's not triggered.
If I comment the ".timeWindow", and print results after the reduce
function, it works
Code for window function is here:

private static class WindowFunImpl implements
WindowFunction {
@Override
public void apply(Tuple tuple, TimeWindow window,
Iterable iterable,
  Collector collector) throws Exception {
KVTuple6 kvTypeTuple = iterable.iterator().next();
System.out.println("window: " + kvTypeTuple);
   // Doesn't work here if use broadcast
Point.Builder builder = Point.measurement(INFLUXDB_MEASUREMENT)
.time(window.getStart(), TimeUnit.MILLISECONDS)
.tag(TAG_DOMAIN, kvTypeTuple.f0)
.tag(TAG_DEVICE, kvTypeTuple.f1)
.tag(TAG_TYPE, kvTypeTuple.f2)
.tag(TAG_KEY, kvTypeTuple.f3)
.addField(FIELD, kvTypeTuple.f4);

collector.collect(builder.build());
}
}


On Wed, Mar 8, 2017 at 1:10 AM, Timo Walther  wrote:

> Hi Sam,
>
> could you explain the behavior a bit more? How does the window function
> behave? Is it not triggered or what is the content? What is the result if
> you don't use a window function?
>
> Timo
>
>
> Am 08/03/17 um 02:59 schrieb Sam Huang:
>
> btw, the reduce function works well, I've printed out the data, and they
>> are
>> all correct. So are the timestamps and watermarks. And if I remove
>> ".broadcast()", the data is successfully sinked.
>>
>> Any help?
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/window-function-not
>> -working-when-control-stream-broadcast-tp12093p12094.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>
>


Re: AWS exception serialization problem

2017-03-08 Thread Stephan Ewen
@Bruno: How are you running Flink? On yarn, standalone, mesos, docker?

On Wed, Mar 8, 2017 at 2:13 PM, Bruno Aranda  wrote:

> Hi,
>
> We have seen something similar in Flink 1.2. We have an operation that
> parses some JSON, and when it fails to parse it, we can see the
> ClassNotFoundException for the relevant exception (in our case
> JsResultException from the play-json library). The library is indeed in the
> shaded JAR, otherwise we would not be able to parse the JSON.
>
> Cheers,
>
> Bruno
>
> On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Shannon,
>>
>> Just to clarify:
>>
>> From the error trace, it seems like that the messages fetched from Kafka
>> are serialized `AmazonS3Exception`s, and you’re emitting a stream of
>> `AmazonS3Exception` as records from FlinkKafkaConsumer?
>> Is this correct? If so, I think we should just make sure that the
>> `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the
>> user fat jar.
>>
>> Also, what is the Flink version you are using?
>>
>> Cheers,
>> Gordon
>>
>


Job completion or failure callback?

2017-03-08 Thread Shannon Carey
Hi,

Is there any way we can run a callback on job completion or failure without 
leaving the client running during job execution? For example, when we submit 
the job via the web UI the main() method's call to 
ExecutionEnvironment#execute() appears to by asynchronous with the job 
execution. Therefore, the execute() call returns before the job is completed. 
This is a bit confusing because the behavior is different when run from the IDE 
vs. run in a cluster, and because signature of the returned class 
JobExecutionResult implies that it can tell you how long execution took (it has 
getNetRuntime()). We would like to be able to detect job completion or failure 
so that we can monitor the success or failure of batch jobs, in particular, so 
that we can react to failures appropriately. It seems like the JobManager 
should be capable of executing callbacks like this. Otherwise, we'll have to 
create an external component that eg. polls the web UI/API for job status.

Does the web UI run in the same JVM as the JobManager (when deployed in YARN)? 
If so, I would expect logs from the main method to appear in the JobManager 
logs. However, for some reason I can't find log messages or System.out  
messages when they are logged in the main() method after execute() is called. 
Why is that?
Edit: figured it out: OptimizerPlanEnvironment#execute() ends with "throw new 
ProgramAbortException()". Tricky and unexpected. That should definitely be 
mentioned in the javadocs of the execute() method! Even the documentation says, 
"The execute() method is returning a JobExecutionResult, this contains 
execution times and accumulator results." which isn't true, or at least isn't 
always true.

Thanks,
Shannon


Re: Connecting workflows in batch

2017-03-08 Thread Shannon Carey
It may not return for batch jobs, either. See my post 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Job-completion-or-failure-callback-td12123.html

In short, if Flink returned an OptimizerPlanEnvironment from your call to 
ExecutionEnvironment.getExecutionEnvironment, when you call execute() it will 
only generate the job plan (the job hasn't been submitted/isn't executing yet), 
and if no exceptions are thrown during creation of the job plan, then a 
ProgramAbortException is always thrown, and none of your code after execute() 
would run, and as a result you're definitely not able to use any 
JobExecutionResult in your main method, even though the code makes it looks 
like you will.

-Shannon

From: Aljoscha Krettek mailto:aljos...@apache.org>>
Date: Friday, March 3, 2017 at 9:36 AM
To: mailto:user@flink.apache.org>>
Subject: Re: Connecting workflows in batch

Yes, right now that call never returns for a long-running streaming job. We 
will (in the future) provide a way for that call to return so that the result 
can be used for checking aggregators and other things.


On Thu, Mar 2, 2017, at 19:14, Mohit Anchlia wrote:
Does it mean that for streaming jobs it never returns?

On Thu, Mar 2, 2017 at 6:21 AM, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:

Hi Mohit,

StreamExecutionEnvironment.execute() will only return giving you the 
JobExecutionResult after the job has reached a final stage. If that works for 
you to schedule the second job, then it should be ok to combine both jobs in 
one program and execute the second job after the first one has completed.

Cheers,
Till


On Thu, Mar 2, 2017 at 2:33 AM, Mohit Anchlia 
mailto:mohitanch...@gmail.com>> wrote:
It looks like JobExecutionResult can be used here by using the accumulators?

On Wed, Mar 1, 2017 at 8:37 AM, Aljoscha Krettek 
mailto:aljos...@apache.org>> wrote:
I think right now the best option is the JobManager REST interface: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html

You would have to know the ID of your job and then you can poll the status of 
your running jobs.

On Mon, 27 Feb 2017 at 18:15 Mohit Anchlia 
mailto:mohitanch...@gmail.com>> wrote:
What's the best way to track the progress of the job?

On Mon, Feb 27, 2017 at 7:56 AM, Aljoscha Krettek 
mailto:aljos...@apache.org>> wrote:
Hi Mohit,
I'm afraid there is nothing like this in Flink yet. As you mentioned you 
probably have to manually track the completion of one job and then trigger 
execution of the next one.

Best,
Aljoscha

On Fri, 24 Feb 2017 at 19:16 Mohit Anchlia 
mailto:mohitanch...@gmail.com>> wrote:
Is there a way to connect 2 workflows such that one triggers the other if 
certain condition is met? However, the workaround may be to insert a 
notification in a topic to trigger another workflow. The problem is that the 
addSink ends the flow so if we need to add a trigger after addSink there 
doesn't seem to be any good way of sending a notification to a queue that the 
batch processing is complete. Any suggestions? One option could be track the 
progress of a job and on a successful completion add a notification. Is there 
such a mechanism available?



Re: AWS exception serialization problem

2017-03-08 Thread Bruno Aranda
Hi Stephan, we are running Flink 1.2.0 on Yarn (AWS EMR cluster)

On Wed, 8 Mar 2017, 21:41 Stephan Ewen,  wrote:

> @Bruno: How are you running Flink? On yarn, standalone, mesos, docker?
>
> On Wed, Mar 8, 2017 at 2:13 PM, Bruno Aranda 
> wrote:
>
> Hi,
>
> We have seen something similar in Flink 1.2. We have an operation that
> parses some JSON, and when it fails to parse it, we can see the
> ClassNotFoundException for the relevant exception (in our case
> JsResultException from the play-json library). The library is indeed in the
> shaded JAR, otherwise we would not be able to parse the JSON.
>
> Cheers,
>
> Bruno
>
> On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai 
> wrote:
>
> Hi Shannon,
>
> Just to clarify:
>
> From the error trace, it seems like that the messages fetched from Kafka
> are serialized `AmazonS3Exception`s, and you’re emitting a stream of
> `AmazonS3Exception` as records from FlinkKafkaConsumer?
> Is this correct? If so, I think we should just make sure that the
> `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the
> user fat jar.
>
> Also, what is the Flink version you are using?
>
> Cheers,
> Gordon
>
>
>


Re: Frequent Full GC's in case of FSStateBackend

2017-03-08 Thread saiprasad mishra
Hi All

I am also seeing issues with FsStateBackend as it stalls coz of full gc. We
have very large state,
Does this mean the below doc should not claim that FsStateBackend is
encouraged for large state.

https://ci.apache.org/projects/flink/flink-docs-release-1.2/ops/state_backends.html#the-fsstatebackend

Regards
Sai

On Fri, Feb 10, 2017 at 6:19 AM, Stefan Richter  wrote:

> Async snapshotting is the default.
>
> Am 10.02.2017 um 14:03 schrieb vinay patil :
>
> Hi Stephan,
>
> Thank you for the clarification.
> Yes with RocksDB I don't see Full GC happening, also I am using Flink
> 1.2.0 version and I have set the statebackend in flink-conf.yaml file to
> rocksdb, so by default does this do asynchronous checkpointing or I have to
> specify it at the job level  ?
>
> Regards,
> Vinay Patil
>
> On Fri, Feb 10, 2017 at 4:16 PM, Stefan Richter [via Apache Flink User
> Mailing List archive.] <[hidden email]> wrote:
>
>> Hi,
>>
>> FSStateBackend operates completely on-heap and only snapshots for
>> checkpoints go against the file system. This is why the backend is
>> typically faster for small states, but can become problematic for larger
>> states. If your state exceeds a certain size, you should strongly consider
>> to use RocksDB as backend. In particular, RocksDB also offers asynchronous
>> snapshots which is very valuable to keep stream processing running for
>> large state. RocksDB works on native memory/disk, so there is no GC to
>> observe. For cases in which your state fits in memory but GC is a problem
>> you could try using the G1 garbage collector which offers better
>> performance for the FSStateBackend than the default.
>>
>> Best,
>> Stefan
>>
>>
>> Am 10.02.2017 um 11:16 schrieb Vinay Patil <[hidden email]
>> >:
>>
>> Hi,
>>
>> I am doing performance test for my pipeline keeping FSStateBackend, I
>> have observed frequent Full GC's after processing 20M records.
>>
>> When I did memory analysis using MAT, it showed that the many objects
>> maintained by Flink state are live.
>>
>> Flink keeps the state in memory even after checkpointing , when does this
>> state gets removed / GC. (I am using window operator in which the DTO comes
>> as input)
>>
>> Also why does Flink keep the state in memory after checkpointing ?
>>
>> P.S Using RocksDB is not causing Full GC at all.
>>
>> Regards,
>> Vinay Patil
>>
>>
>>
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Frequent-Full-GC-s-in-case-of-FSStateBackend-
>> tp11564p11565.html
>> To start a new topic under Apache Flink User Mailing List archive., email 
>> [hidden
>> email]
>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>> NAML
>> 
>>
>
>
> --
> View this message in context: Re: Frequent Full GC's in case of
> FSStateBackend
> 
> Sent from the Apache Flink User Mailing List archive. mailing list archive
>  at
> Nabble.com.
>
>
>


Re: Frequent Full GC's in case of FSStateBackend

2017-03-08 Thread vinay patil
Hi Sai,

If you are sure that your state will not exceed the memory limit of nodes
then you should consider FSStatebackend otherwise you should go for RocksDB

What is the configuration of your cluster ?

On Mar 9, 2017 7:31 AM, "saiprasad mishra [via Apache Flink User Mailing
List archive.]"  wrote:

> Hi All
>
> I am also seeing issues with FsStateBackend as it stalls coz of full gc.
> We have very large state,
> Does this mean the below doc should not claim that FsStateBackend is
> encouraged for large state.
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/ops/state_
> backends.html#the-fsstatebackend
>
> Regards
> Sai
>
> On Fri, Feb 10, 2017 at 6:19 AM, Stefan Richter <[hidden email]
> > wrote:
>
>> Async snapshotting is the default.
>>
>> Am 10.02.2017 um 14:03 schrieb vinay patil <[hidden email]
>> >:
>>
>> Hi Stephan,
>>
>> Thank you for the clarification.
>> Yes with RocksDB I don't see Full GC happening, also I am using Flink
>> 1.2.0 version and I have set the statebackend in flink-conf.yaml file to
>> rocksdb, so by default does this do asynchronous checkpointing or I have to
>> specify it at the job level  ?
>>
>> Regards,
>> Vinay Patil
>>
>> On Fri, Feb 10, 2017 at 4:16 PM, Stefan Richter [via Apache Flink User
>> Mailing List archive.] <[hidden email]> wrote:
>>
>>> Hi,
>>>
>>> FSStateBackend operates completely on-heap and only snapshots for
>>> checkpoints go against the file system. This is why the backend is
>>> typically faster for small states, but can become problematic for larger
>>> states. If your state exceeds a certain size, you should strongly consider
>>> to use RocksDB as backend. In particular, RocksDB also offers asynchronous
>>> snapshots which is very valuable to keep stream processing running for
>>> large state. RocksDB works on native memory/disk, so there is no GC to
>>> observe. For cases in which your state fits in memory but GC is a problem
>>> you could try using the G1 garbage collector which offers better
>>> performance for the FSStateBackend than the default.
>>>
>>> Best,
>>> Stefan
>>>
>>>
>>> Am 10.02.2017 um 11:16 schrieb Vinay Patil <[hidden email]
>>> >:
>>>
>>> Hi,
>>>
>>> I am doing performance test for my pipeline keeping FSStateBackend, I
>>> have observed frequent Full GC's after processing 20M records.
>>>
>>> When I did memory analysis using MAT, it showed that the many objects
>>> maintained by Flink state are live.
>>>
>>> Flink keeps the state in memory even after checkpointing , when does
>>> this state gets removed / GC. (I am using window operator in which the DTO
>>> comes as input)
>>>
>>> Also why does Flink keep the state in memory after checkpointing ?
>>>
>>> P.S Using RocksDB is not causing Full GC at all.
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>>
>>>
>>>
>>> --
>>> If you reply to this email, your message will be added to the discussion
>>> below:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/Frequent-Full-GC-s-in-case-of-FSStateBackend-tp11564p11565.html
>>> To start a new topic under Apache Flink User Mailing List archive.,
>>> email [hidden email]
>>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>>> NAML
>>> 
>>>
>>
>>
>> --
>> View this message in context: Re: Frequent Full GC's in case of
>> FSStateBackend
>> 
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> 
>> at Nabble.com.
>>
>>
>>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Frequent-Full-GC-s-in-case-of-FSStateBackend-tp11564p12126.
> html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> 
> .
> NAML
> 

Re: Frequent Full GC's in case of FSStateBackend

2017-03-08 Thread saiprasad mishra
Thanks Vinay for the quick reply.

Yes rocksdb version is working perfectly without any issues but it needs
more planning on the hardware side for the servers running the job

As you said and observed FsStateBackend is not useful for large state which
does not fit memory, we do have very large state

Regards
Sai

On Wed, Mar 8, 2017 at 6:21 PM, vinay patil  wrote:

> Hi Sai,
>
> If you are sure that your state will not exceed the memory limit of nodes
> then you should consider FSStatebackend otherwise you should go for RocksDB
>
> What is the configuration of your cluster ?
>
> On Mar 9, 2017 7:31 AM, "saiprasad mishra [via Apache Flink User Mailing
> List archive.]" <[hidden email]
> > wrote:
>
>> Hi All
>>
>> I am also seeing issues with FsStateBackend as it stalls coz of full gc.
>> We have very large state,
>> Does this mean the below doc should not claim that FsStateBackend is
>> encouraged for large state.
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>> ops/state_backends.html#the-fsstatebackend
>>
>> Regards
>> Sai
>>
>> On Fri, Feb 10, 2017 at 6:19 AM, Stefan Richter <[hidden email]
>> > wrote:
>>
>>> Async snapshotting is the default.
>>>
>>> Am 10.02.2017 um 14:03 schrieb vinay patil <[hidden email]
>>> >:
>>>
>>> Hi Stephan,
>>>
>>> Thank you for the clarification.
>>> Yes with RocksDB I don't see Full GC happening, also I am using Flink
>>> 1.2.0 version and I have set the statebackend in flink-conf.yaml file to
>>> rocksdb, so by default does this do asynchronous checkpointing or I have to
>>> specify it at the job level  ?
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Fri, Feb 10, 2017 at 4:16 PM, Stefan Richter [via Apache Flink User
>>> Mailing List archive.] <[hidden email]> wrote:
>>>
 Hi,

 FSStateBackend operates completely on-heap and only snapshots for
 checkpoints go against the file system. This is why the backend is
 typically faster for small states, but can become problematic for larger
 states. If your state exceeds a certain size, you should strongly consider
 to use RocksDB as backend. In particular, RocksDB also offers asynchronous
 snapshots which is very valuable to keep stream processing running for
 large state. RocksDB works on native memory/disk, so there is no GC to
 observe. For cases in which your state fits in memory but GC is a problem
 you could try using the G1 garbage collector which offers better
 performance for the FSStateBackend than the default.

 Best,
 Stefan


 Am 10.02.2017 um 11:16 schrieb Vinay Patil <[hidden email]
 >:

 Hi,

 I am doing performance test for my pipeline keeping FSStateBackend, I
 have observed frequent Full GC's after processing 20M records.

 When I did memory analysis using MAT, it showed that the many objects
 maintained by Flink state are live.

 Flink keeps the state in memory even after checkpointing , when does
 this state gets removed / GC. (I am using window operator in which the DTO
 comes as input)

 Also why does Flink keep the state in memory after checkpointing ?

 P.S Using RocksDB is not causing Full GC at all.

 Regards,
 Vinay Patil




 --
 If you reply to this email, your message will be added to the
 discussion below:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nab
 ble.com/Frequent-Full-GC-s-in-case-of-FSStateBackend-tp11564p11565.html
 To start a new topic under Apache Flink User Mailing List archive.,
 email [hidden email]
 To unsubscribe from Apache Flink User Mailing List archive., click here
 .
 NAML
 

>>>
>>>
>>> --
>>> View this message in context: Re: Frequent Full GC's in case of
>>> FSStateBackend
>>> 
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive
>>> 
>>> at Nabble.com.
>>>
>>>
>>>
>>
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.233605

ProcessFunction example

2017-03-08 Thread Philippe Caparroy
I think there is an error in the code snippet describing the ProcessFunction 
time out example :  
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html


@Override
public void onTimer(long timestamp, OnTimerContext ctx, 
Collector> out)
throws Exception {

// get the state for the key that scheduled the timer
CountWithTimestamp result = state.value();

// check if this is an outdated timer or the latest timer
if (timestamp == result.lastModified) {
// emit the state
out.collect(new Tuple2(result.key, result.count));
}
}
If, as stated in the example, the CountWithTimeoutFunction should emit a 
key/count if no further update occurred during the  minute elapsed since last 
update, the test should be : 

if (timestamp == result.lastModified + 6) { 
// emit the state on timeout 
out.collect(new Tuple2(result.key, result.count)); 
}

As stated in the javadoc of the ProcessFunction : the timestamp arg of on timer 
method is the timestamp of the firing timer.