Re: Classloader leak with Kafka Mbeans / JMX Reporter

2018-02-06 Thread Chesnay Schepler
It would also be useful to know which source/sink you are using and 
which kafka version that is.


On 07.02.2018 08:58, Chesnay Schepler wrote:

Thanks for reporting this.

To narrow things down a bit, is your job using both kafka sources and 
sinks?


On 06.02.2018 21:30, Edward wrote:
I'm having an issue where off-heap memory is growing unchecked until 
I get

OOM exceptions.

I was hoping that upgrading to 1.4 would solve these, since the 
child-first

classloader is supposed to resolve issues with Avro classes cached in a
different classloader (which prevents the classloaders from being 
garbage

collected).

However, after upgrading, we are still seeing an off-heap memory leak. I
think I may have isolated the issue to the JmxReporter class used for
collecting Kafka metrics.

Here are the details of what I'm seeing:
Our cluster is running in kubernetes, using the latest flink:1.4 docker
image. We are using the default classloading order (child first).

If I resubmit my job repeatedly, the ClassLoaders from the previous job
submissions don't get cleaned up, and the non-heap memory slowly 
grows until

the task manager runs out of memory.

I can see all of the un-deleted classloaders if I run "sudo -u flink 
jmap
-clstats " (the output is below). This list of dead 
classloaders
continues to grow every time I kill and resubmit a new Flink job.  In 
all,
it lists 3200 dead class loaders. I'm only going to upload the ones 
which

show more than 2K of used memory.

finding class loader instances ..done.
computing per loader stat ..done.
please wait.. computing liveness.liveness analysis may be inaccurate ...
class_loaderclassesbytesparent_loaderalive? type
0x807302a07522122130760x804c58c0 dead
sun/misc/Launcher$AppClassLoader@0x0001f070
0x8eb0369960215350x807302a0 dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98 


0x94200190369360168070x807302a0 dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98 


0x9e7bc6c8369660010810x807302a0 dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98 


0xa9d80008358455304120x807302a0 dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98 


0xf4103650358155273540x807302a0 dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98 


0x901801f8358155273540x807302a0 dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98 


0x942637c0323151211760x807302a0 dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98 


0x96c2ec00323151196620x807302a0 dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98 


0x8f60322551162410x807302a0 dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98 


0x92700d48322851122700x807302a0 dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98 


25484424440nulllive 
0x96b77190223436346020x807302a0 dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98 



Next I took a heap dump:
sudo -u flink jmap -dump:format=b,file=/tmp/HeapDump.hprof 

Then, using Eclipse Memory Analyzer, I followed the steps from this blog
post:
http://java.jiderhamn.se/2011/12/11/classloader-leaks-i-how-to-find-classloader-leaks-with-eclipse-memory-analyser-mat/ 



The result of looking for strong references to classes in a dead 
classloader

is this tree:

Class Name
| Shallow Heap | Retained Heap
--- 

org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader 


@ 0x94200190|   88 |   616,992
'-  class
org.apache.kafka.common.metrics.JmxReporter$KafkaMbean @ 0x94250cb0
|0 | 0
'-  org.apache.kafka.common.metrics.JmxReporter$KafkaMbean @
0xbae537e8   |   24 | 328
   '- object com.sun.jmx.mbeanserver.NamedObject @ 0xbace01e0
|   24 |24
  '- value java.util.HashMap$Node @ 0xbace0110
|   32 |   232
  

Re: Classloader leak with Kafka Mbeans / JMX Reporter

2018-02-06 Thread Chesnay Schepler

Thanks for reporting this.

To narrow things down a bit, is your job using both kafka sources and sinks?

On 06.02.2018 21:30, Edward wrote:

I'm having an issue where off-heap memory is growing unchecked until I get
OOM exceptions.

I was hoping that upgrading to 1.4 would solve these, since the child-first
classloader is supposed to resolve issues with Avro classes cached in a
different classloader (which prevents the classloaders from being garbage
collected).

However, after upgrading, we are still seeing an off-heap memory leak. I
think I may have isolated the issue to the JmxReporter class used for
collecting Kafka metrics.

Here are the details of what I'm seeing:
Our cluster is running in kubernetes, using the latest flink:1.4 docker
image. We are using the default classloading order (child first).

If I resubmit my job repeatedly, the ClassLoaders from the previous job
submissions don't get cleaned up, and the non-heap memory slowly grows until
the task manager runs out of memory.

I can see all of the un-deleted classloaders if I run "sudo -u flink jmap
-clstats " (the output is below). This list of dead classloaders
continues to grow every time I kill and resubmit a new Flink job.  In all,
it lists 3200 dead class loaders. I'm only going to upload the ones which
show more than 2K of used memory.

finding class loader instances ..done.
computing per loader stat ..done.
please wait.. computing liveness.liveness analysis may be inaccurate ...
class_loaderclasses bytes   parent_loader   alive?  type
0x807302a0  7522122130760x804c58c0  dead
sun/misc/Launcher$AppClassLoader@0x0001f070
0x8eb0  36996021535 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0x94200190  36936016807 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0x9e7bc6c8  36966001081 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0xa9d80008  35845530412 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0xf4103650  35815527354 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0x901801f8  35815527354 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0x942637c0  32315121176 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0x96c2ec00  32315119662 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0x8f60  32255116241 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0x92700d48  32285112270 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
   25484424440 nulllive
0x96b77190  22343634602 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98

Next I took a heap dump:
sudo -u flink jmap -dump:format=b,file=/tmp/HeapDump.hprof 

Then, using Eclipse Memory Analyzer, I followed the steps from this blog
post:
http://java.jiderhamn.se/2011/12/11/classloader-leaks-i-how-to-find-classloader-leaks-with-eclipse-memory-analyser-mat/

The result of looking for strong references to classes in a dead classloader
is this tree:

Class Name
| Shallow Heap | Retained Heap
---
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader
@ 0x94200190|   88 |   616,992
'-  class
org.apache.kafka.common.metrics.JmxReporter$KafkaMbean @ 0x94250cb0
|0 | 0
'-  org.apache.kafka.common.metrics.JmxReporter$KafkaMbean @
0xbae537e8   |   24 |   328
   '- object com.sun.jmx.mbeanserver.NamedObject @ 0xbace01e0
|   24 |24
  '- value java.util.HashMap$Node @ 0xbace0110
|   32 |   232
 '- [247] java.util.HashMap$Node[512] @ 0xfa0d08c0
|2,064 |   120,104
'- table java.util.HashMap @ 0x806

Re: How to monitor the latency?

2018-02-06 Thread Chesnay Schepler

Correct, in pre-1.5 the latency metric can only be used via the JMXReporter.
With 1.5 you will be able to access the latency metric via any reporter 
of the REST API, but as it stands still not the WebUI.


On 07.02.2018 05:05, Marvin777 wrote:
As far as I know, as for using the latency metric, only the 
JMXReporter now.


FLINK-7608  may be 
help you.


2018-02-07 11:40 GMT+08:00 FatMouse <934336...@qq.com 
>:


Hello:

I hava set the LatencyTrackingInterval as 2000,but int the `Task
Metrics` latency was always Nan.How can I monitor the latency?Thanks.


Code:

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setLatencyTrackingInterval(2000);

Best,

MAtrix42






Re: Get the JobID when Flink job fails

2018-02-06 Thread Chesnay Schepler

As far as i know there's no way to specify the JobID when submitting a job.

I've responded to your previous question in a separate mail.

On 06.02.2018 18:14, Vinay Patil wrote:

Hi,

I see we can generate our own JobID, but how do I use it to submit the 
job to the cluster.


I am using remoteExecutionEnvironment to submit the job to the cluster.

Also, can you please answer the query of earlier mail.

Regards,
Vinay Patil

On Thu, Feb 1, 2018 at 1:50 PM, Vinay Patil > wrote:


Hi,

When the Flink job executes successfully I get the jobID, however
when the Flink job fails the jobID is not returned.

How do I get the jobId in this case ?

Do I need to call /joboverview REST api to get the job ID by 
looking for the Job Name ?


Regards,
Vinay Patil






Re: Get the JobID when Flink job fails

2018-02-06 Thread Chesnay Schepler
Yes I think that's the only way to get it. I'll open a JIRA to print the 
JobID also for failed jobs.


On 01.02.2018 20:50, Vinay Patil wrote:

Hi,

When the Flink job executes successfully I get the jobID, however when 
the Flink job fails the jobID is not returned.


How do I get the jobId in this case ?

Do I need to call /joboverview REST api to get the job ID by  looking 
for the Job Name ?


Regards,
Vinay Patil





Maximizing resource allocation in EMR (EC2)

2018-02-06 Thread Ishwara Varnasi
Hello, my question is regarding running Flink (streaming) on EMR. How do I
maximize resource allocation? For example, how do I allocate optimal number
of yarn job managers, slots, heap sizes etc given EC2 instance types I'll
be using in EMR? I see that (by trial/error) I'm able to allocate just two
third of available memory to yarn. Thanks for any suggestions.
thanks
Ishwara Varnasi


Re: How to monitor the latency?

2018-02-06 Thread Marvin777
As far as I know, as for using the latency metric, only the JMXReporter
now.

FLINK-7608  may be help
you.

2018-02-07 11:40 GMT+08:00 FatMouse <934336...@qq.com>:

> Hello:
>
> I hava set the LatencyTrackingInterval as 2000,but int the `Task Metrics`
> latency was always Nan.How can I monitor the latency?Thanks.
>
>
> Code:
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
> env.getConfig().setLatencyTrackingInterval(2000);
>
> Best,
>
> MAtrix42
>


How to monitor the latency?

2018-02-06 Thread FatMouse
Hello:


I hava set the LatencyTrackingInterval as 2000,but int the `Task Metrics` 
latency was always Nan.How can I monitor the latency?Thanks.





Code:


StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setLatencyTrackingInterval(2000);



Best,


MAtrix42

Triggering a Savepoint

2018-02-06 Thread Gregory Fee
Hi group, I want to bootstrap some aggregates based on historic data in S3 and 
then keep them updated based on a stream. To do this I was thinking of doing 
something like processing all of the historic data, doing a save point, then 
restoring my program from that save point but with a stream source instead. 
Does this seem like a reasonable approach or is there a better way to approach 
this functionality? There does not appear to be a straightforward way of doing 
it the way I was thinking so any advice would be appreciated. Thanks!

Classloader leak with Kafka Mbeans / JMX Reporter

2018-02-06 Thread Edward
I'm having an issue where off-heap memory is growing unchecked until I get
OOM exceptions.

I was hoping that upgrading to 1.4 would solve these, since the child-first
classloader is supposed to resolve issues with Avro classes cached in a
different classloader (which prevents the classloaders from being garbage
collected).

However, after upgrading, we are still seeing an off-heap memory leak. I
think I may have isolated the issue to the JmxReporter class used for
collecting Kafka metrics.

Here are the details of what I'm seeing:
Our cluster is running in kubernetes, using the latest flink:1.4 docker
image. We are using the default classloading order (child first).

If I resubmit my job repeatedly, the ClassLoaders from the previous job
submissions don't get cleaned up, and the non-heap memory slowly grows until
the task manager runs out of memory. 

I can see all of the un-deleted classloaders if I run "sudo -u flink jmap
-clstats " (the output is below). This list of dead classloaders
continues to grow every time I kill and resubmit a new Flink job.  In all,
it lists 3200 dead class loaders. I'm only going to upload the ones which
show more than 2K of used memory.

finding class loader instances ..done.
computing per loader stat ..done.
please wait.. computing liveness.liveness analysis may be inaccurate ...
class_loaderclasses bytes   parent_loader   alive?  type
0x807302a0  7522122130760x804c58c0  dead
sun/misc/Launcher$AppClassLoader@0x0001f070
0x8eb0  36996021535 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0x94200190  36936016807 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0x9e7bc6c8  36966001081 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0xa9d80008  35845530412 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0xf4103650  35815527354 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0x901801f8  35815527354 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0x942637c0  32315121176 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0x96c2ec00  32315119662 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0x8f60  32255116241 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
0x92700d48  32285112270 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98
 25484424440 nulllive
0x96b77190  22343634602 0x807302a0  dead
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader@0x0001005cdc98

Next I took a heap dump:
sudo -u flink jmap -dump:format=b,file=/tmp/HeapDump.hprof 

Then, using Eclipse Memory Analyzer, I followed the steps from this blog
post:
http://java.jiderhamn.se/2011/12/11/classloader-leaks-i-how-to-find-classloader-leaks-with-eclipse-memory-analyser-mat/

The result of looking for strong references to classes in a dead classloader
is this tree:

Class Name  
   
| Shallow Heap | Retained Heap
---
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader
@ 0x94200190|   88 |   616,992
'-  class
org.apache.kafka.common.metrics.JmxReporter$KafkaMbean @ 0x94250cb0 

|0 | 0
   '-  org.apache.kafka.common.metrics.JmxReporter$KafkaMbean @
0xbae537e8   |   24 |   328
  '- object com.sun.jmx.mbeanserver.NamedObject @ 0xbace01e0
   
|   24 |24
 '- value java.util.HashMap$Node @ 0xbace0110   
   
|   32 |   232
'- [247] java.util.HashMap$Node[512] @ 0xfa0d08c0   

Fwd: Question about flink checkpoint

2018-02-06 Thread Chengzhi Zhao
Hey, I am new to flink and I have a question and want to see if anyone can
help here.

So we have a s3 path that flink is monitoring that path to see new files
available.

val avroInputStream_activity = env.readFile(format, path,
FileProcessingMode.PROCESS_CONTINUOUSLY, 1)

I am doing both internal and external check pointing and let's say there is
a bad file came to the path and flink will do several retries. I want to
take those bad files and let the process continue. However, since the file
path persist in the checkpoint, when I try to resume from external
checkpoint, it threw the following error on no file been found.

java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No
such file or directory: s3a://myfile

Is there a way to skip this bad file and move on?
Thanks in advance.

Best,
Chengzhi Zhao


Re: Get the JobID when Flink job fails

2018-02-06 Thread Vinay Patil
Hi,

I see we can generate our own JobID, but how do I use it to submit the job
to the cluster.

I am using remoteExecutionEnvironment to submit the job to the cluster.

Also, can you please answer the query of earlier mail.

Regards,
Vinay Patil

On Thu, Feb 1, 2018 at 1:50 PM, Vinay Patil  wrote:

> Hi,
>
> When the Flink job executes successfully I get the jobID, however when the
> Flink job fails the jobID is not returned.
>
> How do I get the jobId in this case ?
>
> Do I need to call /joboverview REST api to get the job ID by  looking for
> the Job Name ?
>
> Regards,
> Vinay Patil
>


Re: Flink CEP exception during RocksDB update

2018-02-06 Thread Kostas Kloudas
Hi Varun,

The branch I previously sent you has been now merged to the master.
So could you try the master and tell us if you see any change in the behavior? 
Has the problem been fixed, or has the message of the exception changed?

Thanks, 
Kostas

> On Jan 29, 2018, at 10:09 AM, Kostas Kloudas  
> wrote:
> 
> Hi again Varun,
> 
> I am investigating the problem you mentioned and I found a bug in the 
> SharedBuffer, 
> but I am not sure if it is the only bug that affects you.
> 
> Could you please try this branch https://github.com/kl0u/flink/tree/cep-inv 
>  and let me know
> if the problem is still there?
> 
> In addition, are you using Scala with case classes or Java?
> 
> Thanks for helping fix the problem,
> Kostas
> 
>> On Jan 24, 2018, at 5:54 PM, Kostas Kloudas > > wrote:
>> 
>> Hi Varun,
>> 
>> Thanks for taking time to look into it. Could you give a sample input and 
>> your pattern to reproduce the problem?
>> That would help a lot at figuring out the cause of the problem.
>> 
>> Thanks,
>> Kostas
>> 
>>> On Jan 23, 2018, at 5:40 PM, Varun Dhore >> > wrote:
>>> 
>>> Hi Kostas,
>>> 
>>> I was able to reproduce the error with 1.4.0. After upgrading the cluster 
>>> to 1.5 snapshot and running through the same data I am still experiencing 
>>> the same exception. CEP patterns that I am running are using followed by 
>>> patterns e.g AfBfC. From my experience I was never able to get stable 
>>> execution when checkpoints are enabled. When I disable checkpoints CEP jobs 
>>> are running fine. Aside from this particular error I also notice that 
>>> majority of checkpoints expire as the do not complete within configured 5 
>>> min timeout period. Any suggestions on further debugging runtime 
>>> checkpoints would be very helpful. 
>>> Thanks in advance for your assistance.
>>> 
>>> Regards,
>>> Varun 
>>> 
>>> On Jan 18, 2018, at 8:11 AM, Kostas Kloudas >> > wrote:
>>> 
 Thanks a lot Varun!
 
 Kostas
 
> On Jan 17, 2018, at 9:59 PM, Varun Dhore  > wrote:
> 
> Thank you Kostas. Since this error is not easily reproducible on my end 
> I’ll continue testing this and confirm the resolution once I am able to 
> do so.
> 
> Thanks,
> Varun 
> 
> Sent from my iPhone
> 
> On Jan 15, 2018, at 10:21 AM, Kostas Kloudas  > wrote:
> 
>> Hi Varun,
>> 
>> This can be related to this issue: 
>> https://issues.apache.org/jira/browse/FLINK-8226 
>> 
>> which is currently fixed on the master.
>> 
>> Could you please try the current master to see if the error persists?
>> 
>> Thanks,
>> Kostas
>> 
>>> On Jan 15, 2018, at 4:09 PM, Varun Dhore >> > wrote:
>>> 
>>> 
>>> 
 Hello Flink community,
  
 I have encountered following exception while testing 1.4.0 release. 
 This error is occurring intermittently and my CEP job keeps restarting 
 after this exception. I am running the job with Event time semantics 
 and checkpoints enabled.
  
  
 java.lang.RuntimeException: Exception occurred while 
 processing valve output watermark:
 at 
 org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
 at 
 org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
 at 
 org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
 at 
 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
 at 
 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
 at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
 at 
 org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.lang.RuntimeException: Error while adding 
 data to RocksDB
 at 
 org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:102)
 at 
 org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(Abstract

Re: CEP issue

2018-02-06 Thread Kostas Kloudas
Thanks a lot Vishal! 

We are looking forward to a test case that reproduces the failure.

Kostas

> On Feb 2, 2018, at 4:05 PM, Vishal Santoshi  wrote:
> 
> This is the pattern. Will create a test case. 
> /**
>  *
>  * @param condition a single condition is applied as a  acceptance criteria
>  * @param params defining the bounds of the pattern.
>  * @param  the element in the stream
>  * @return compiled pattern alonf with the params.
>  */
> public static  RelaxedContiguousPattern 
> of(SimpleCondition condition,
>   
> RelaxedContiguityWithinTime params,
>   
> RichMapFunction>, List>> mapFunc,
>   
> String patternId) {
> assert (params.seriesLength >= params.elementCount && params.elementCount 
> > 0);
> Pattern pattern = Pattern.
> begin(START).
> where(condition);
> if (params.elementCount > 1) pattern = pattern.
> followedBy(REST).
> where(condition).
> times(params.elementCount - 1);
> 
> return new RelaxedContiguousPattern(
> pattern.within(Time.minutes(params.seriesLength * 
> params.period.duration))
> ,params, 
> params.elementCount > 1, 
> params.period.duration, 
> mapFunc, 
> patternId
> );
> }
> 
> 
> 
> On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz  > wrote:
> Could you provide some example to reproduce the case? Or the Pattern that you 
> are using? It would help track down the issue.
> 
> > On 2 Feb 2018, at 13:35, Vishal Santoshi  > > wrote:
> >
> > I have pulled in the flink master cep library and the runtime ( the cluster 
> > ) is configured to work against the latest and greatest. This does not 
> > happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) but is always 
> > an issue when it is a larger range ( 20 out of 25 with range of 8 hours ) . 
> > Does that makes sense?
> >
> > On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz 
> > mailto:wysakowicz.da...@gmail.com>> wrote:
> > This problem sounds very similar to this one that was fixed for 1.4.1 and 
> > 1.5.0:
> > https://issues.apache.org/jira/browse/FLINK-8226 
> > 
> >
> > Could you check if that helps with your problem too?
> >
> > > On 1 Feb 2018, at 23:34, Vishal Santoshi  > > > wrote:
> > >
> > > I have flink master CEP library code imported to  a 1.4 build.
> > >
> > > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi 
> > > mailto:vishal.santo...@gmail.com>> wrote:
> > > A new one
> > >
> > > java.lang.OutOfMemoryError: Java heap space
> > >   at java.util.Arrays.copyOf(
> > > Arrays.java:3332)
> > >   at java.lang.
> > > AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:
> > > 124)
> > >   at java.lang.
> > > AbstractStringBuilder.append(AbstractStringBuilder.java:
> > > 448)
> > >   at java.lang.StringBuilder.
> > > append(StringBuilder.java:136)
> > >   at java.lang.StringBuilder.
> > > append(StringBuilder.java:131)
> > >   at org.apache.commons.lang3.
> > > StringUtils.join(StringUtils.
> > > java:4106)
> > >   at org.apache.commons.lang3.
> > > StringUtils.join(StringUtils.
> > > java:4151)
> > >   at org.apache.flink.cep.nfa.
> > > SharedBuffer$SharedBufferEntry.toString(
> > > SharedBuffer.java:624)
> > >   at java.lang.String.valueOf(
> > > String.java:2994)
> > >   at java.lang.StringBuilder.
> > > append(StringBuilder.java:131)
> > >   at org.apache.flink.cep.nfa.
> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:
> > > 673)
> > >   at java.lang.String.valueOf(
> > > String.java:2994)
> > >   at java.lang.StringBuilder.
> > > append(StringBuilder.java:131)
> > >   at org.apache.commons.lang3.
> > > StringUtils.join(StringUtils.
> > > java:4097)
> > >   at org.apache.commons.lang3.
> > > StringUtils.join(StringUtils.
> > > java:4151)
> > >   at org.apache.flink.cep.nfa.
> > > SharedBuffer$SharedBufferEntry.toString(
> > > SharedBuffer.java:624)
> > >   at java.lang.String.valueOf(
> > > String.java:2994)
> > >   at java.lang.StringBuilder.
> > > append(StringBuilder.java:131)
> > >   at org.apache.flink.cep.nfa.
> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673)
> > > .
> > > .
> > > .
> > > It is the toString() on
> > > SharedBuffer
> > > no doubt. Some recursive loop ?
> > >
> > >
> > > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi 
> > > mailto:vishal.santo...@gmail.com>> wrote:
> > > It happens when it looks to throw an exception and calls 
> > > shardBuffer.toString. b'coz of the check
> > >
> > >
> > > int id = sharedBuffer.entryId;
> > > Precondition

Re: how to match external checkpoints with jobs during recovery

2018-02-06 Thread Aljoscha Krettek
Hi,

I'm afraid it's currently not possible to distinguish between externalised 
checkpoints when running multiple jobs on one JobManager because the 
externalised checkpoints of all jobs would be written to the same directory.

In Flink 1.5 (which is not yet released, but the code for this is in master 
already) we changed how this part works and the externalised checkpoints data 
is now written to the checkpointing directory that you configure for each job, 
making it possible to distinguish externalised checkpoints. There is no more 
global directory for externalised checkpoints where the data for all jobs is 
written to.

Best,
Aljoscha

> On 5. Feb 2018, at 20:27, xiatao123  wrote:
> 
> The external checkpoints are in the format of
> checkpoint_metadata-0057
> which I have no idea which job this checkpoint metadata belongs to if I have
> multiple jobs running at the same time.
> 
> If a job failed unexpected, I need to know which checkpoints belongs to the
> failed job. Is there API or someway to show the checkpoints folder for a
> certain job?
> 
> Thanks for the help
> Tao
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Testing flink class loading

2018-02-06 Thread Aljoscha Krettek
Hi,

You can take a look at test-infra/end-to-end-test/test_streaming_classloader.sh 
to see how that testing job is used in the actual test.

Best,
Aljoscha

> On 6. Feb 2018, at 07:55, Data Engineer  wrote:
> 
> I am trying to run the ClassLoaderTestProgram on flink.
> 
> 1. I have started Flink in local mode with the following command:
>   bin/jobmanager.sh start local
> 
> 2. I ran the  ClassLoaderTestProgram  jar:
>bin/flink run ClassLoaderTestProgram.jar --resolve-first child 
> --output out.txt
> 
> I get a  java.lang.NoSuchMethodError: 
> org.apache.flink.runtime.taskmanager.TaskManager.getMessage when I run with 
> child-first, and I get the output when I run it in parent-first mode.
> 
> The documentation states that "we get a {@link NoSuchMethodError} if we're 
> running with {@code parent-first} class loading
>   and that we get the correct result from the method when we're running with 
> {@code child-first} class loading."
> 
> Am I doing something wrong here?
> 
> Regards,
> James



Re: deduplication with streaming sql

2018-02-06 Thread Henri Heiskanen
Hi,

Oh, right.. got it. Thanks!

Br,
Henkka

On Tue, Feb 6, 2018 at 5:01 PM, Fabian Hueske  wrote:

> Hi Henkka,
>
> This should be fairly easy to implement in a ProcessFunction.
> You're making a good call to worry about the number of timers. If you
> register a timer multiple times on the same time, the timer is deduplicated
> ;-) and will only fire once for that time.
> That's why the state retention time allows to set a min and max timer.
> With that, you only have to set a timer every (max - min) interval. For
> example, if you say, the application should keep the state at least for 12
> hours but the most for 14 hours, you only need to register a new timer
> every 2 hours.
>
> Hope this helps,
> Fabian
>
> 2018-02-06 15:47 GMT+01:00 Henri Heiskanen :
>
>> Hi,
>>
>> Thanks.
>>
>> Doing this deduplication would be easy just by using vanilla flink api
>> and state (check if this is a new key and then emit), but the issue has
>> been automatic state cleanup. However, it looks like this streaming sql
>> retention time implementation uses the process function and timer. I was a
>> bit reluctant to use that because I was worried that the approach would be
>> overkill with our volumes, but maybe it will work just fine. Can you help
>> me a bit how to implement it efficiently?
>>
>> Basically we get estimated of 20M of distinct rows/key and roughly 300
>> events per key during one day. What I would like to do is to clear the
>> state for specific key if I have not seen such key for last 12 hours. I
>> think its very close to example here: https://ci.apache.org/pr
>> ojects/flink/flink-docs-release-1.4/dev/stream/operators/
>> process_function.html. Instead of emitting the data onTimer I would just
>> clear the state. In the example each tuple will invoke
>> registerEventTimeTimer(). Is this the correct pattern? E.g. in our case we
>> could get hundreds of events with the same key during few minutes, so would
>> we then register hundreds of timer instances?
>>
>> Br,
>> Henkka
>>
>> On Tue, Feb 6, 2018 at 3:45 PM, Fabian Hueske  wrote:
>>
>>> Hi Henri,
>>>
>>> thanks for reaching out and providing code and data to reproduce the
>>> issue.
>>>
>>> I think you are right, a "SELECT DISTINCT a, b, c FROM X"  should not
>>> result in a retraction stream.
>>>
>>> However, with the current implementation we internally need a retraction
>>> stream if a state retention time is configured.
>>> The reason lies in how state retention time is defined: the state
>>> retention time will remove the state for a key if it hasn't been seen for x
>>> time.
>>> This means that an operator resets a state clean-up timer of a key
>>> whenever a new record with that key is received. This is also true for
>>> retraction / insertion messages of the same record.
>>> If we implement the GroupBy that performs the DISTINCT as an operator
>>> that emits an append stream, all downstream operator won't see any updates
>>> because the GroupBy only emits the first and filters out all duplicates.
>>> Hence, downstream operators would perform a clean-up too early.
>>>
>>> I see that these are internals that users should not need to worry
>>> about, but right now there is no easy solution to this.
>>> Eventually, the clean-up timer reset should be differently implemented
>>> than using retraction and insert of the same record. However, this would be
>>> a more involved change and requires good planning.
>>>
>>> I'll file a JIRA for that.
>>>
>>> Thanks again for bringing the issue to our attention.
>>>
>>> Best, Fabian
>>>
>>>
>>> 2018-02-06 13:59 GMT+01:00 Timo Walther :
>>>
 Hi Henri,

 I just noticed that I had a tiny mistake in my little test program. So
 SELECT DISTINCT is officially supported. But the question if this is a
 valid append stream is still up for discussion. I will loop in Fabian (in
 CC).

 For the general behavior you can also look into the code and especially
 the comments there [1].

 Regards,
 Timo

 [1] https://github.com/apache/flink/blob/master/flink-libraries/
 flink-table/src/main/scala/org/apache/flink/table/runtime/ag
 gregate/GroupAggProcessFunction.scala


 Am 2/6/18 um 1:36 PM schrieb Timo Walther:

 Hi Henri,

 I try to answer your question:

 1) You are right, SELECT DISTINCT should not need a retract stream.
 Internally, this is translated into an aggregation without an aggregate
 function call. So this definitely needs improvement.

 2) The problem is that SELECT DISTINCT is not officially supported nor
 tested. I opened an issue for this [1].

 Until this issue is fixed I would recommend to implement a custom
 aggregate function that keeps track values seen so far [2].

 Regards,
 Timo

 [1] https://issues.apache.org/jira/browse/FLINK-8564
 [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
 dev/table/udfs.html#aggregation-functions


Re: deduplication with streaming sql

2018-02-06 Thread Fabian Hueske
Hi Henkka,

This should be fairly easy to implement in a ProcessFunction.
You're making a good call to worry about the number of timers. If you
register a timer multiple times on the same time, the timer is deduplicated
;-) and will only fire once for that time.
That's why the state retention time allows to set a min and max timer. With
that, you only have to set a timer every (max - min) interval. For example,
if you say, the application should keep the state at least for 12 hours but
the most for 14 hours, you only need to register a new timer every 2 hours.

Hope this helps,
Fabian

2018-02-06 15:47 GMT+01:00 Henri Heiskanen :

> Hi,
>
> Thanks.
>
> Doing this deduplication would be easy just by using vanilla flink api and
> state (check if this is a new key and then emit), but the issue has been
> automatic state cleanup. However, it looks like this streaming sql
> retention time implementation uses the process function and timer. I was a
> bit reluctant to use that because I was worried that the approach would be
> overkill with our volumes, but maybe it will work just fine. Can you help
> me a bit how to implement it efficiently?
>
> Basically we get estimated of 20M of distinct rows/key and roughly 300
> events per key during one day. What I would like to do is to clear the
> state for specific key if I have not seen such key for last 12 hours. I
> think its very close to example here: https://ci.apache.org/
> projects/flink/flink-docs-release-1.4/dev/stream/
> operators/process_function.html. Instead of emitting the data onTimer I
> would just clear the state. In the example each tuple will invoke
> registerEventTimeTimer(). Is this the correct pattern? E.g. in our case we
> could get hundreds of events with the same key during few minutes, so would
> we then register hundreds of timer instances?
>
> Br,
> Henkka
>
> On Tue, Feb 6, 2018 at 3:45 PM, Fabian Hueske  wrote:
>
>> Hi Henri,
>>
>> thanks for reaching out and providing code and data to reproduce the
>> issue.
>>
>> I think you are right, a "SELECT DISTINCT a, b, c FROM X"  should not
>> result in a retraction stream.
>>
>> However, with the current implementation we internally need a retraction
>> stream if a state retention time is configured.
>> The reason lies in how state retention time is defined: the state
>> retention time will remove the state for a key if it hasn't been seen for x
>> time.
>> This means that an operator resets a state clean-up timer of a key
>> whenever a new record with that key is received. This is also true for
>> retraction / insertion messages of the same record.
>> If we implement the GroupBy that performs the DISTINCT as an operator
>> that emits an append stream, all downstream operator won't see any updates
>> because the GroupBy only emits the first and filters out all duplicates.
>> Hence, downstream operators would perform a clean-up too early.
>>
>> I see that these are internals that users should not need to worry about,
>> but right now there is no easy solution to this.
>> Eventually, the clean-up timer reset should be differently implemented
>> than using retraction and insert of the same record. However, this would be
>> a more involved change and requires good planning.
>>
>> I'll file a JIRA for that.
>>
>> Thanks again for bringing the issue to our attention.
>>
>> Best, Fabian
>>
>>
>> 2018-02-06 13:59 GMT+01:00 Timo Walther :
>>
>>> Hi Henri,
>>>
>>> I just noticed that I had a tiny mistake in my little test program. So
>>> SELECT DISTINCT is officially supported. But the question if this is a
>>> valid append stream is still up for discussion. I will loop in Fabian (in
>>> CC).
>>>
>>> For the general behavior you can also look into the code and especially
>>> the comments there [1].
>>>
>>> Regards,
>>> Timo
>>>
>>> [1] https://github.com/apache/flink/blob/master/flink-libraries/
>>> flink-table/src/main/scala/org/apache/flink/table/runtime/
>>> aggregate/GroupAggProcessFunction.scala
>>>
>>>
>>> Am 2/6/18 um 1:36 PM schrieb Timo Walther:
>>>
>>> Hi Henri,
>>>
>>> I try to answer your question:
>>>
>>> 1) You are right, SELECT DISTINCT should not need a retract stream.
>>> Internally, this is translated into an aggregation without an aggregate
>>> function call. So this definitely needs improvement.
>>>
>>> 2) The problem is that SELECT DISTINCT is not officially supported nor
>>> tested. I opened an issue for this [1].
>>>
>>> Until this issue is fixed I would recommend to implement a custom
>>> aggregate function that keeps track values seen so far [2].
>>>
>>> Regards,
>>> Timo
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-8564
>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>> dev/table/udfs.html#aggregation-functions
>>>
>>>
>>> Am 2/6/18 um 11:11 AM schrieb Henri Heiskanen:
>>>
>>> Hi,
>>>
>>> I have a use case where I would like to find distinct rows over certain
>>> period of time. Requirement is that new row is emitted asap. Otherwise the
>>> requir

Re: deduplication with streaming sql

2018-02-06 Thread Henri Heiskanen
Hi,

Thanks.

Doing this deduplication would be easy just by using vanilla flink api and
state (check if this is a new key and then emit), but the issue has been
automatic state cleanup. However, it looks like this streaming sql
retention time implementation uses the process function and timer. I was a
bit reluctant to use that because I was worried that the approach would be
overkill with our volumes, but maybe it will work just fine. Can you help
me a bit how to implement it efficiently?

Basically we get estimated of 20M of distinct rows/key and roughly 300
events per key during one day. What I would like to do is to clear the
state for specific key if I have not seen such key for last 12 hours. I
think its very close to example here:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html.
Instead of emitting the data onTimer I would just clear the state. In the
example each tuple will invoke registerEventTimeTimer(). Is this the
correct pattern? E.g. in our case we could get hundreds of events with the
same key during few minutes, so would we then register hundreds of timer
instances?

Br,
Henkka

On Tue, Feb 6, 2018 at 3:45 PM, Fabian Hueske  wrote:

> Hi Henri,
>
> thanks for reaching out and providing code and data to reproduce the issue.
>
> I think you are right, a "SELECT DISTINCT a, b, c FROM X"  should not
> result in a retraction stream.
>
> However, with the current implementation we internally need a retraction
> stream if a state retention time is configured.
> The reason lies in how state retention time is defined: the state
> retention time will remove the state for a key if it hasn't been seen for x
> time.
> This means that an operator resets a state clean-up timer of a key
> whenever a new record with that key is received. This is also true for
> retraction / insertion messages of the same record.
> If we implement the GroupBy that performs the DISTINCT as an operator that
> emits an append stream, all downstream operator won't see any updates
> because the GroupBy only emits the first and filters out all duplicates.
> Hence, downstream operators would perform a clean-up too early.
>
> I see that these are internals that users should not need to worry about,
> but right now there is no easy solution to this.
> Eventually, the clean-up timer reset should be differently implemented
> than using retraction and insert of the same record. However, this would be
> a more involved change and requires good planning.
>
> I'll file a JIRA for that.
>
> Thanks again for bringing the issue to our attention.
>
> Best, Fabian
>
>
> 2018-02-06 13:59 GMT+01:00 Timo Walther :
>
>> Hi Henri,
>>
>> I just noticed that I had a tiny mistake in my little test program. So
>> SELECT DISTINCT is officially supported. But the question if this is a
>> valid append stream is still up for discussion. I will loop in Fabian (in
>> CC).
>>
>> For the general behavior you can also look into the code and especially
>> the comments there [1].
>>
>> Regards,
>> Timo
>>
>> [1] https://github.com/apache/flink/blob/master/flink-libraries/
>> flink-table/src/main/scala/org/apache/flink/table/
>> runtime/aggregate/GroupAggProcessFunction.scala
>>
>>
>> Am 2/6/18 um 1:36 PM schrieb Timo Walther:
>>
>> Hi Henri,
>>
>> I try to answer your question:
>>
>> 1) You are right, SELECT DISTINCT should not need a retract stream.
>> Internally, this is translated into an aggregation without an aggregate
>> function call. So this definitely needs improvement.
>>
>> 2) The problem is that SELECT DISTINCT is not officially supported nor
>> tested. I opened an issue for this [1].
>>
>> Until this issue is fixed I would recommend to implement a custom
>> aggregate function that keeps track values seen so far [2].
>>
>> Regards,
>> Timo
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-8564
>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> dev/table/udfs.html#aggregation-functions
>>
>>
>> Am 2/6/18 um 11:11 AM schrieb Henri Heiskanen:
>>
>> Hi,
>>
>> I have a use case where I would like to find distinct rows over certain
>> period of time. Requirement is that new row is emitted asap. Otherwise the
>> requirement is mainly to just filter out data to have smaller dataset for
>> downstream. I noticed that SELECT DISTINCT and state retention time of 12
>> hours would in theory do the trick. You can find the code below. Few
>> questions.
>>
>> 1) Why is SELECT DISTINCT creating a retract stream? In which scenarios
>> we would get update/delete rows?
>>
>> 2) If I run the below code with the example data (also below) without
>> state retention config I get the two append rows (expected). If I run
>> exactly the code below (with the retention config) I'll get two appends and
>> one delete for AN1234 and then one append for AN. What is going on?
>>
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>> ExecutionEnvironment();
>>
>> StreamTableEnvironment tabl

kafka as recovery only source

2018-02-06 Thread Sofer, Tovi
Hi group,

I wanted to get your suggestion on how to implement two requirements we have:

* One is to read from external message queue (JMS) at very fast latency

* Second is to support zero data loss, so that in case of restart and 
recovery, messages not checkpointed (and not part of state) will be replayed 
again.

(which indicates kind of replayble source)

Because of the first requirement we can't write JMS messages to Kafka first and 
only then read from kafka, because it will increase latency.
Instead we thought to consume the JMS messages and forward them both to job and 
to KafkaSink.
Then in case of failure and recovery, we want to start in recovery mode, and 
read message from offset matching the state\checkpoint.
How can this be done? We though to somehow save in the state the last flushed 
kakfa offset.
The problem is this information is available only via future\interceptor and we 
don't know how to connect it to state, so RecoverySource can use it...

So current suggestion looks something like:

Happy path:
JMSQueue-> JMSConsumer -> JobMessageParser(and additional operators), KafkaSink
(Here maybe we can add ProducerInterceptor-> which saves offset to state 
somehow)

Failure path: (will run before HappyPath to recover data)
RecoverySource-> JobMessageParser(and additional operators)
(Here maybe add Queryable state client which reads offsets from other operator 
state)

Thanks,
Tovi



Re: deduplication with streaming sql

2018-02-06 Thread Fabian Hueske
Hi Henri,

thanks for reaching out and providing code and data to reproduce the issue.

I think you are right, a "SELECT DISTINCT a, b, c FROM X"  should not
result in a retraction stream.

However, with the current implementation we internally need a retraction
stream if a state retention time is configured.
The reason lies in how state retention time is defined: the state retention
time will remove the state for a key if it hasn't been seen for x time.
This means that an operator resets a state clean-up timer of a key whenever
a new record with that key is received. This is also true for retraction /
insertion messages of the same record.
If we implement the GroupBy that performs the DISTINCT as an operator that
emits an append stream, all downstream operator won't see any updates
because the GroupBy only emits the first and filters out all duplicates.
Hence, downstream operators would perform a clean-up too early.

I see that these are internals that users should not need to worry about,
but right now there is no easy solution to this.
Eventually, the clean-up timer reset should be differently implemented than
using retraction and insert of the same record. However, this would be a
more involved change and requires good planning.

I'll file a JIRA for that.

Thanks again for bringing the issue to our attention.

Best, Fabian


2018-02-06 13:59 GMT+01:00 Timo Walther :

> Hi Henri,
>
> I just noticed that I had a tiny mistake in my little test program. So
> SELECT DISTINCT is officially supported. But the question if this is a
> valid append stream is still up for discussion. I will loop in Fabian (in
> CC).
>
> For the general behavior you can also look into the code and especially
> the comments there [1].
>
> Regards,
> Timo
>
> [1] https://github.com/apache/flink/blob/master/flink-
> libraries/flink-table/src/main/scala/org/apache/flink/
> table/runtime/aggregate/GroupAggProcessFunction.scala
>
>
> Am 2/6/18 um 1:36 PM schrieb Timo Walther:
>
> Hi Henri,
>
> I try to answer your question:
>
> 1) You are right, SELECT DISTINCT should not need a retract stream.
> Internally, this is translated into an aggregation without an aggregate
> function call. So this definitely needs improvement.
>
> 2) The problem is that SELECT DISTINCT is not officially supported nor
> tested. I opened an issue for this [1].
>
> Until this issue is fixed I would recommend to implement a custom
> aggregate function that keeps track values seen so far [2].
>
> Regards,
> Timo
>
> [1] https://issues.apache.org/jira/browse/FLINK-8564
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/table/udfs.html#aggregation-functions
>
>
> Am 2/6/18 um 11:11 AM schrieb Henri Heiskanen:
>
> Hi,
>
> I have a use case where I would like to find distinct rows over certain
> period of time. Requirement is that new row is emitted asap. Otherwise the
> requirement is mainly to just filter out data to have smaller dataset for
> downstream. I noticed that SELECT DISTINCT and state retention time of 12
> hours would in theory do the trick. You can find the code below. Few
> questions.
>
> 1) Why is SELECT DISTINCT creating a retract stream? In which scenarios we
> would get update/delete rows?
>
> 2) If I run the below code with the example data (also below) without
> state retention config I get the two append rows (expected). If I run
> exactly the code below (with the retention config) I'll get two appends and
> one delete for AN1234 and then one append for AN. What is going on?
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
>
> StreamTableEnvironment tableEnv = TableEnvironment.
> getTableEnvironment(env);
>
> StreamQueryConfig qConfig = tableEnv.queryConfig();
> // set idle state retention time. min = max = 12 hours
> qConfig.withIdleStateRetentionTime(Time.hours(12));
>
> // create a TableSource
> CsvTableSource csvSource = CsvTableSource
> .builder()
> .path("data.csv")
> .field("ts", Types.SQL_TIMESTAMP())
> .field("aid1", Types.STRING())
> .field("aid2", Types.STRING())
> .field("advertiser_id", Types.STRING())
> .field("platform_id", Types.STRING())
> .fieldDelimiter(",")
> .build();
>
> tableEnv.registerTableSource("CsvTable", csvSource);
>
> Table result = tableEnv.sqlQuery(
> "SELECT DISTINCT aid1, aid2, advertiser_id, platform_id FROM CsvTable");
>
> StdOutRetractStreamTableSink out = new StdOutRetractStreamTableSink(new
> String[] {"aid1", "aid2", "advertiser_id", "platform_id"},
> new TypeInformation[] {Types.STRING(), Types.STRING(), Types.STRING(),
> Types.STRING()});
>
> result.writeToSink(out, qConfig);
>
> env.execute();
>
>
> Here is a simple csv dataset of three rows:
>
> 2018-01-31 12:00:00,AN1234,RC1234,---0,1234-1234-
> 1234-1234,1234567890
> 2018-01-31 12:00:02,AN1234,RC1234,---0,1234-1234-
> 1234-1234,1234567890
> 2018-01-31 12:00:02,AN,RC,---1,1234-1234-
> 1234-1234,1234567891
>
>
>
>


Re: How does BucketingSink generate a SUCCESS file when a directory is finished

2018-02-06 Thread Fabian Hueske
According to the JavaDocs of BucketingSink, in-progress files are still
being written to.
I don't know what would cause a file to remain in that state.

Another thing to mention, you might want to ensure that only the last task
that renames the file generates the _SUCCESS file.
Otherwise, the data might be consumed too early. You'd have to figure out
how to do that in a race-condition free way.

Best, Fabian

2018-02-06 11:03 GMT+01:00 xiaobin yan :

> Hi,
>
> I think it can be judged at the BucketingSink.notifyCheckpointComplete()
> method  in this way, as shown below:
>
> if (!bucketState.isWriterOpen &&
> bucketState.pendingFiles.isEmpty() &&
> bucketState.pendingFilesPerCheckpoint.isEmpty()) {
> boolean flag = true;
> RemoteIterator files = fs.listFiles(new 
> Path(directory), false);
> while (files.hasNext()) {
> LocatedFileStatus file = files.next();
> String fileName = file.getPath().getName();
> if (fileName.lastIndexOf(".") != -1) {
> flag = false;
> break;
> }
> }
> Path path = new Path(directory + "/_SUCCESS");
> if (flag && !fs.exists(path)){
> FSDataOutputStream outputStream = fs.create(path);
> outputStream.flush();
> outputStream.close();
> }
> // We've dealt with all the pending files and the writer for this bucket 
> is not currently open.
> // Therefore this bucket is currently inactive and we can remove it from 
> our state.
> bucketStatesIt.remove();
> }
>
>
> But we find this problem: occasionally, a file is always in the
> in-progress state, and the amount of data processed by each sub task is
> almost the same, and there is no data skew. There are no exceptions in the
> program.
>
> Best,
> Ben
>
>
>
> On 6 Feb 2018, at 5:50 PM, xiaobin yan 
> wrote:
>
> Hi,
>
> Thanks for your reply! See here :https://github.com/apache/
> flink/blob/master/flink-connectors/flink-connector-
> filesystem/src/main/java/org/apache/flink/streaming/
> connectors/fs/bucketing/BucketingSink.java#L652  After calling this
> method, the file is renamed,and we can't determine which subtask is
> finished at last.
>
> Best,
> Ben
>
> On 6 Feb 2018, at 5:35 PM, Fabian Hueske  wrote:
>
> The notifyCheckpointComplete() method will be called when all subtasks of
> a job completed their checkpoints.
> Check the JavaDocs of the CheckpointListener class [1].
>
> Please note that you need to handle the case where multiple tasks try to
> create the _SUCCESS file concurrently.
> So, there is a good chance of race conditions between file checks and
> modifications.
>
> Best, Fabian
>
> [1] https://github.com/apache/flink/blob/master/flink-
> runtime/src/main/java/org/apache/flink/runtime/state/
> CheckpointListener.java
>
> 2018-02-06 4:14 GMT+01:00 xiaobin yan :
>
>> Hi ,
>>
>> You've got a point. I saw that method, but how can I make sure that all
>> the subtasks checkpoint are finished, because I can only write _SUCCESS
>> file at that time.
>>
>> Best,
>> Ben
>>
>>
>> On 5 Feb 2018, at 6:34 PM, Fabian Hueske  wrote:
>>
>> In case of a failure, Flink rolls back the job to the last checkpoint and
>> reprocesses all data since that checkpoint.
>> Also the BucketingSink will truncate a file to the position of the last
>> checkpoint if the file system supports truncate. If not, it writes a file
>> with the valid length and starts a new file.
>>
>> Therefore, all files that the BucketingSink finishes must be treated as
>> volatile until the next checkpoint is completed.
>> Only when a checkpoint is completed a finalized file may be read. The
>> files are renamed on checkpoint to signal that they are final and can be
>> read. This would also be the right time to generate a _SUCCESS file.
>> Have a look at the BucketingSink.notifyCheckpointComplete() method.
>>
>> Best, Fabian
>>
>>
>>
>>
>> 2018-02-05 6:43 GMT+01:00 xiaobin yan :
>>
>>> Hi ,
>>>
>>> I have tested it. There are some small problems. When checkpoint is
>>> finished, the name of the file will change, and the success file will be
>>> written before checkpoint.
>>>
>>> Best,
>>> Ben
>>>
>>>
>>> On 1 Feb 2018, at 8:06 PM, Kien Truong  wrote:
>>>
>>> Hi,
>>>
>>> I did not actually test this, but I think with Flink 1.4 you can extend
>>> BucketingSink and overwrite the invoke method to access the watermark
>>>
>>> Pseudo code:
>>>
>>> invoke(IN value, SinkFunction.Context context) {
>>>
>>>long currentWatermark = context.watermark()
>>>
>>>long taskIndex = getRuntimeContext().getIndexOfThisSubtask()
>>>
>>> if (taskIndex == 0 && currentWatermark - lastSuccessWatermark > 1 hour) 
>>> {
>>>
>>>Write _SUCCESS
>>>
>>>lastSuccessWatermark = currentWatermark round down to 1 hour
>>>
>>> }
>>>
>>> invoke(value)
>>>
>>> }
>>>
>>>
>>> Regards,
>>> Kien
>>>
>>> On 1/31/2018 5:54 PM, xiaobin yan wrote:
>>>
>>> Hi:
>>>
>>> I think so too! But I have a question that when should I add this logic in 
>>> BucketingS

Re: deduplication with streaming sql

2018-02-06 Thread Timo Walther

Hi Henri,

I just noticed that I had a tiny mistake in my little test program. So 
SELECT DISTINCT is officially supported. But the question if this is a 
valid append stream is still up for discussion. I will loop in Fabian 
(in CC).


For the general behavior you can also look into the code and especially 
the comments there [1].


Regards,
Timo

[1] 
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala



Am 2/6/18 um 1:36 PM schrieb Timo Walther:

Hi Henri,

I try to answer your question:

1) You are right, SELECT DISTINCT should not need a retract stream. 
Internally, this is translated into an aggregation without an 
aggregate function call. So this definitely needs improvement.


2) The problem is that SELECT DISTINCT is not officially supported nor 
tested. I opened an issue for this [1].


Until this issue is fixed I would recommend to implement a custom 
aggregate function that keeps track values seen so far [2].


Regards,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-8564
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/udfs.html#aggregation-functions



Am 2/6/18 um 11:11 AM schrieb Henri Heiskanen:

Hi,

I have a use case where I would like to find distinct rows over 
certain period of time. Requirement is that new row is emitted asap. 
Otherwise the requirement is mainly to just filter out data to have 
smaller dataset for downstream. I noticed that SELECT DISTINCT and 
state retention time of 12 hours would in theory do the trick. You 
can find the code below. Few questions.


1) Why is SELECT DISTINCT creating a retract stream? In which 
scenarios we would get update/delete rows?


2) If I run the below code with the example data (also below) without 
state retention config I get the two append rows (expected). If I run 
exactly the code below (with the retention config) I'll get two 
appends and one delete for AN1234 and then one append for AN. 
What is going on?


StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();


StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);


StreamQueryConfig qConfig = tableEnv.queryConfig();
// set idle state retention time. min = max = 12 hours
qConfig.withIdleStateRetentionTime(Time.hours(12));

// create a TableSource
CsvTableSource csvSource = CsvTableSource
.builder()
.path("data.csv")
.field("ts", Types.SQL_TIMESTAMP())
.field("aid1", Types.STRING())
.field("aid2", Types.STRING())
.field("advertiser_id", Types.STRING())
.field("platform_id", Types.STRING())
.fieldDelimiter(",")
.build();

tableEnv.registerTableSource("CsvTable", csvSource);

Table result = tableEnv.sqlQuery(
"SELECT DISTINCT aid1, aid2, advertiser_id, platform_id FROM CsvTable");

StdOutRetractStreamTableSink out = new 
StdOutRetractStreamTableSink(new String[] {"aid1", "aid2", 
"advertiser_id", "platform_id"},
new TypeInformation[] {Types.STRING(), Types.STRING(), 
Types.STRING(), Types.STRING()});


result.writeToSink(out, qConfig);

env.execute();


Here is a simple csv dataset of three rows:

2018-01-31 
12:00:00,AN1234,RC1234,---0,1234-1234-1234-1234,1234567890
2018-01-31 
12:00:02,AN1234,RC1234,---0,1234-1234-1234-1234,1234567890
2018-01-31 
12:00:02,AN,RC,---1,1234-1234-1234-1234,1234567891








Re: deduplication with streaming sql

2018-02-06 Thread Timo Walther

Hi Henri,

I try to answer your question:

1) You are right, SELECT DISTINCT should not need a retract stream. 
Internally, this is translated into an aggregation without an aggregate 
function call. So this definitely needs improvement.


2) The problem is that SELECT DISTINCT is not officially supported nor 
tested. I opened an issue for this [1].


Until this issue is fixed I would recommend to implement a custom 
aggregate function that keeps track values seen so far [2].


Regards,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-8564
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/udfs.html#aggregation-functions



Am 2/6/18 um 11:11 AM schrieb Henri Heiskanen:

Hi,

I have a use case where I would like to find distinct rows over 
certain period of time. Requirement is that new row is emitted asap. 
Otherwise the requirement is mainly to just filter out data to have 
smaller dataset for downstream. I noticed that SELECT DISTINCT and 
state retention time of 12 hours would in theory do the trick. You can 
find the code below. Few questions.


1) Why is SELECT DISTINCT creating a retract stream? In which 
scenarios we would get update/delete rows?


2) If I run the below code with the example data (also below) without 
state retention config I get the two append rows (expected). If I run 
exactly the code below (with the retention config) I'll get two 
appends and one delete for AN1234 and then one append for AN. What 
is going on?


StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();


StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);


StreamQueryConfig qConfig = tableEnv.queryConfig();
// set idle state retention time. min = max = 12 hours
qConfig.withIdleStateRetentionTime(Time.hours(12));

// create a TableSource
CsvTableSource csvSource = CsvTableSource
.builder()
.path("data.csv")
.field("ts", Types.SQL_TIMESTAMP())
.field("aid1", Types.STRING())
.field("aid2", Types.STRING())
.field("advertiser_id", Types.STRING())
.field("platform_id", Types.STRING())
.fieldDelimiter(",")
.build();

tableEnv.registerTableSource("CsvTable", csvSource);

Table result = tableEnv.sqlQuery(
"SELECT DISTINCT aid1, aid2, advertiser_id, platform_id FROM CsvTable");

StdOutRetractStreamTableSink out = new 
StdOutRetractStreamTableSink(new String[] {"aid1", "aid2", 
"advertiser_id", "platform_id"},
new TypeInformation[] {Types.STRING(), Types.STRING(), Types.STRING(), 
Types.STRING()});


result.writeToSink(out, qConfig);

env.execute();


Here is a simple csv dataset of three rows:

2018-01-31 
12:00:00,AN1234,RC1234,---0,1234-1234-1234-1234,1234567890
2018-01-31 
12:00:02,AN1234,RC1234,---0,1234-1234-1234-1234,1234567890
2018-01-31 
12:00:02,AN,RC,---1,1234-1234-1234-1234,1234567891






deduplication with streaming sql

2018-02-06 Thread Henri Heiskanen
Hi,

I have a use case where I would like to find distinct rows over certain
period of time. Requirement is that new row is emitted asap. Otherwise the
requirement is mainly to just filter out data to have smaller dataset for
downstream. I noticed that SELECT DISTINCT and state retention time of 12
hours would in theory do the trick. You can find the code below. Few
questions.

1) Why is SELECT DISTINCT creating a retract stream? In which scenarios we
would get update/delete rows?

2) If I run the below code with the example data (also below) without state
retention config I get the two append rows (expected). If I run exactly the
code below (with the retention config) I'll get two appends and one delete
for AN1234 and then one append for AN. What is going on?

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

StreamQueryConfig qConfig = tableEnv.queryConfig();
// set idle state retention time. min = max = 12 hours
qConfig.withIdleStateRetentionTime(Time.hours(12));

// create a TableSource
CsvTableSource csvSource = CsvTableSource
.builder()
.path("data.csv")
.field("ts", Types.SQL_TIMESTAMP())
.field("aid1", Types.STRING())
.field("aid2", Types.STRING())
.field("advertiser_id", Types.STRING())
.field("platform_id", Types.STRING())
.fieldDelimiter(",")
.build();

tableEnv.registerTableSource("CsvTable", csvSource);

Table result = tableEnv.sqlQuery(
"SELECT DISTINCT aid1, aid2, advertiser_id, platform_id FROM CsvTable");

StdOutRetractStreamTableSink out = new StdOutRetractStreamTableSink(new
String[] {"aid1", "aid2", "advertiser_id", "platform_id"},
new TypeInformation[] {Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING()});

result.writeToSink(out, qConfig);

env.execute();


Here is a simple csv dataset of three rows:

2018-01-31
12:00:00,AN1234,RC1234,---0,1234-1234-1234-1234,1234567890
2018-01-31
12:00:02,AN1234,RC1234,---0,1234-1234-1234-1234,1234567890
2018-01-31
12:00:02,AN,RC,---1,1234-1234-1234-1234,1234567891


Re: How does BucketingSink generate a SUCCESS file when a directory is finished

2018-02-06 Thread xiaobin yan
Hi, 

I think it can be judged at the BucketingSink.notifyCheckpointComplete() method 
 in this way, as shown below:
if (!bucketState.isWriterOpen &&
bucketState.pendingFiles.isEmpty() &&
bucketState.pendingFilesPerCheckpoint.isEmpty()) {
boolean flag = true;
RemoteIterator files = fs.listFiles(new Path(directory), 
false);
while (files.hasNext()) {
LocatedFileStatus file = files.next();
String fileName = file.getPath().getName();
if (fileName.lastIndexOf(".") != -1) {
flag = false;
break;
}
}
Path path = new Path(directory + "/_SUCCESS");
if (flag && !fs.exists(path)){
FSDataOutputStream outputStream = fs.create(path);
outputStream.flush();
outputStream.close();
}
// We've dealt with all the pending files and the writer for this bucket is 
not currently open.
// Therefore this bucket is currently inactive and we can remove it from 
our state.
bucketStatesIt.remove();
}

But we find this problem: occasionally, a file is always in the in-progress 
state, and the amount of data processed by each sub task is almost the same, 
and there is no data skew. There are no exceptions in the program.

Best,
Ben



> On 6 Feb 2018, at 5:50 PM, xiaobin yan  wrote:
> 
> Hi, 
> 
> Thanks for your reply! See here 
> :https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L652
>  
> 
>   After calling this method, the file is renamed,and we can't determine which 
> subtask is finished at last.
> 
> Best,
> Ben
> 
>> On 6 Feb 2018, at 5:35 PM, Fabian Hueske > > wrote:
>> 
>> The notifyCheckpointComplete() method will be called when all subtasks of a 
>> job completed their checkpoints.
>> Check the JavaDocs of the CheckpointListener class [1].
>> 
>> Please note that you need to handle the case where multiple tasks try to 
>> create the _SUCCESS file concurrently.
>> So, there is a good chance of race conditions between file checks and 
>> modifications.
>> 
>> Best, Fabian
>> 
>> [1] 
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
>>  
>> 
>> 
>> 2018-02-06 4:14 GMT+01:00 xiaobin yan > >:
>> Hi ,
>> 
>>  You've got a point. I saw that method, but how can I make sure that all 
>> the subtasks checkpoint are finished, because I can only write _SUCCESS file 
>> at that time.
>> 
>> Best,
>> Ben
>> 
>> 
>>> On 5 Feb 2018, at 6:34 PM, Fabian Hueske >> > wrote:
>>> 
>>> In case of a failure, Flink rolls back the job to the last checkpoint and 
>>> reprocesses all data since that checkpoint. 
>>> Also the BucketingSink will truncate a file to the position of the last 
>>> checkpoint if the file system supports truncate. If not, it writes a file 
>>> with the valid length and starts a new file.
>>> 
>>> Therefore, all files that the BucketingSink finishes must be treated as 
>>> volatile until the next checkpoint is completed. 
>>> Only when a checkpoint is completed a finalized file may be read. The files 
>>> are renamed on checkpoint to signal that they are final and can be read. 
>>> This would also be the right time to generate a _SUCCESS file.
>>> Have a look at the BucketingSink.notifyCheckpointComplete() method.
>>> 
>>> Best, Fabian
>>> 
>>> 
>>> 
>>> 
>>> 2018-02-05 6:43 GMT+01:00 xiaobin yan >> >:
>>> Hi ,
>>> 
>>> I have tested it. There are some small problems. When checkpoint is 
>>> finished, the name of the file will change, and the success file will be 
>>> written before checkpoint.
>>> 
>>> Best,
>>> Ben
>>> 
>>> 
 On 1 Feb 2018, at 8:06 PM, Kien Truong >>> > wrote:
 
 Hi,
 
 I did not actually test this, but I think with Flink 1.4 you can extend 
 BucketingSink and overwrite the invoke method to access the watermark
 Pseudo code:
 invoke(IN value, SinkFunction.Context context) {
long currentWatermark = context.watermark()
long taskIndex = getRuntimeContext().getIndexOfThisSubtask()
 if (taskIndex == 0 && currentWatermark - lastSuccessWatermark > 1 
 hour) {
Write _SUCCESS
lastSuccessWatermark = currentWatermark round down to 1 hour
 }
 invoke(value)
 }
 
 Regards,
 Kien
 On 1/31/2018 5:54 PM, xiaobin yan wrote:
> Hi:
> 
> I think so too! But I have a question that when should I add this logic

Re: How does BucketingSink generate a SUCCESS file when a directory is finished

2018-02-06 Thread xiaobin yan
Hi, 

Thanks for your reply! See here 
:https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L652
 

  After calling this method, the file is renamed,and we can't determine which 
subtask is finished at last.

Best,
Ben

> On 6 Feb 2018, at 5:35 PM, Fabian Hueske  wrote:
> 
> The notifyCheckpointComplete() method will be called when all subtasks of a 
> job completed their checkpoints.
> Check the JavaDocs of the CheckpointListener class [1].
> 
> Please note that you need to handle the case where multiple tasks try to 
> create the _SUCCESS file concurrently.
> So, there is a good chance of race conditions between file checks and 
> modifications.
> 
> Best, Fabian
> 
> [1] 
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
>  
> 
> 
> 2018-02-06 4:14 GMT+01:00 xiaobin yan  >:
> Hi ,
> 
>   You've got a point. I saw that method, but how can I make sure that all 
> the subtasks checkpoint are finished, because I can only write _SUCCESS file 
> at that time.
> 
> Best,
> Ben
> 
> 
>> On 5 Feb 2018, at 6:34 PM, Fabian Hueske > > wrote:
>> 
>> In case of a failure, Flink rolls back the job to the last checkpoint and 
>> reprocesses all data since that checkpoint. 
>> Also the BucketingSink will truncate a file to the position of the last 
>> checkpoint if the file system supports truncate. If not, it writes a file 
>> with the valid length and starts a new file.
>> 
>> Therefore, all files that the BucketingSink finishes must be treated as 
>> volatile until the next checkpoint is completed. 
>> Only when a checkpoint is completed a finalized file may be read. The files 
>> are renamed on checkpoint to signal that they are final and can be read. 
>> This would also be the right time to generate a _SUCCESS file.
>> Have a look at the BucketingSink.notifyCheckpointComplete() method.
>> 
>> Best, Fabian
>> 
>> 
>> 
>> 
>> 2018-02-05 6:43 GMT+01:00 xiaobin yan > >:
>> Hi ,
>> 
>>  I have tested it. There are some small problems. When checkpoint is 
>> finished, the name of the file will change, and the success file will be 
>> written before checkpoint.
>> 
>> Best,
>> Ben
>> 
>> 
>>> On 1 Feb 2018, at 8:06 PM, Kien Truong >> > wrote:
>>> 
>>> Hi,
>>> 
>>> I did not actually test this, but I think with Flink 1.4 you can extend 
>>> BucketingSink and overwrite the invoke method to access the watermark
>>> Pseudo code:
>>> invoke(IN value, SinkFunction.Context context) {
>>>long currentWatermark = context.watermark()
>>>long taskIndex = getRuntimeContext().getIndexOfThisSubtask()
>>> if (taskIndex == 0 && currentWatermark - lastSuccessWatermark > 1 hour) 
>>> {
>>>Write _SUCCESS
>>>lastSuccessWatermark = currentWatermark round down to 1 hour
>>> }
>>> invoke(value)
>>> }
>>> 
>>> Regards,
>>> Kien
>>> On 1/31/2018 5:54 PM, xiaobin yan wrote:
 Hi:
 
 I think so too! But I have a question that when should I add this logic in 
 BucketingSink! And who does this logic, and ensures that the logic is 
 executed only once, not every parallel instance of the sink that executes 
 this logic!
 
 Best,
 Ben
 
> On 31 Jan 2018, at 5:58 PM, Hung  
>  wrote:
> 
> it depends on how you partition your file. in my case I write file per 
> hour,
> so I'm sure that file is ready after that hour period, in processing time.
> Here, read to be ready means this file contains all the data in that hour
> period.
> 
> If the downstream runs in a batch way, you may want to ensure the file is
> ready.
> In this case, ready to read can mean all the data before watermark as
> arrived.
> You could take the BucketingSink and implement this logic there, maybe 
> wait
> until watermark
> reaches
> 
> Best,
> 
> Sendoh
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> 
>> 
>> 
> 
> 



Re: How does BucketingSink generate a SUCCESS file when a directory is finished

2018-02-06 Thread Fabian Hueske
The notifyCheckpointComplete() method will be called when all subtasks of a
job completed their checkpoints.
Check the JavaDocs of the CheckpointListener class [1].

Please note that you need to handle the case where multiple tasks try to
create the _SUCCESS file concurrently.
So, there is a good chance of race conditions between file checks and
modifications.

Best, Fabian

[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java

2018-02-06 4:14 GMT+01:00 xiaobin yan :

> Hi ,
>
> You've got a point. I saw that method, but how can I make sure that all
> the subtasks checkpoint are finished, because I can only write _SUCCESS
> file at that time.
>
> Best,
> Ben
>
>
> On 5 Feb 2018, at 6:34 PM, Fabian Hueske  wrote:
>
> In case of a failure, Flink rolls back the job to the last checkpoint and
> reprocesses all data since that checkpoint.
> Also the BucketingSink will truncate a file to the position of the last
> checkpoint if the file system supports truncate. If not, it writes a file
> with the valid length and starts a new file.
>
> Therefore, all files that the BucketingSink finishes must be treated as
> volatile until the next checkpoint is completed.
> Only when a checkpoint is completed a finalized file may be read. The
> files are renamed on checkpoint to signal that they are final and can be
> read. This would also be the right time to generate a _SUCCESS file.
> Have a look at the BucketingSink.notifyCheckpointComplete() method.
>
> Best, Fabian
>
>
>
>
> 2018-02-05 6:43 GMT+01:00 xiaobin yan :
>
>> Hi ,
>>
>> I have tested it. There are some small problems. When checkpoint is
>> finished, the name of the file will change, and the success file will be
>> written before checkpoint.
>>
>> Best,
>> Ben
>>
>>
>> On 1 Feb 2018, at 8:06 PM, Kien Truong  wrote:
>>
>> Hi,
>>
>> I did not actually test this, but I think with Flink 1.4 you can extend
>> BucketingSink and overwrite the invoke method to access the watermark
>>
>> Pseudo code:
>>
>> invoke(IN value, SinkFunction.Context context) {
>>
>>long currentWatermark = context.watermark()
>>
>>long taskIndex = getRuntimeContext().getIndexOfThisSubtask()
>>
>> if (taskIndex == 0 && currentWatermark - lastSuccessWatermark > 1 hour) {
>>
>>Write _SUCCESS
>>
>>lastSuccessWatermark = currentWatermark round down to 1 hour
>>
>> }
>>
>> invoke(value)
>>
>> }
>>
>>
>> Regards,
>> Kien
>>
>> On 1/31/2018 5:54 PM, xiaobin yan wrote:
>>
>> Hi:
>>
>> I think so too! But I have a question that when should I add this logic in 
>> BucketingSink! And who does this logic, and ensures that the logic is 
>> executed only once, not every parallel instance of the sink that executes 
>> this logic!
>>
>> Best,
>> Ben
>>
>>
>> On 31 Jan 2018, at 5:58 PM, Hung  
>>  wrote:
>>
>> it depends on how you partition your file. in my case I write file per hour,
>> so I'm sure that file is ready after that hour period, in processing time.
>> Here, read to be ready means this file contains all the data in that hour
>> period.
>>
>> If the downstream runs in a batch way, you may want to ensure the file is
>> ready.
>> In this case, ready to read can mean all the data before watermark as
>> arrived.
>> You could take the BucketingSink and implement this logic there, maybe wait
>> until watermark
>> reaches
>>
>> Best,
>>
>> Sendoh
>>
>>
>>
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>>
>
>


Re: Will that be a problem if POJO key type didn't override equals?

2018-02-06 Thread Tony Wei
Hi Timo,

Thanks a lot. I will try it out.

Best Regards,
Tony Wei

2018-02-06 17:25 GMT+08:00 Timo Walther :

> With heap-based state I meant state that is stored using the
> MemoryStateBackend or FsStateBackend [1]. In general, even if you are just
> using ValueState, the key might be used internally to store your value
> state in hash table.
>
> I think the migration should work in your case. Otherwise feel free to let
> us know.
>
> Regards,
> Timo
>
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/ops/state/state_backends.html#the-memorystatebackend
>
>
> Am 2/6/18 um 8:54 AM schrieb Tony Wei:
>
> Hi Timo,
>
> Thanks for your response. I will implement equals for my POJO directly. Is
> that be okay instead of wrap it into another class?
> Furthermore, I want to migrate the states from the previous job. Will it
> lead to state lost? I run my job on Flink 1.4.0. I used RocksDBStateBackend
> and only ValueState as key state.
>
> BTW, could you please give more explanations about what heap-based state
> is? Since I'm not familiar with the details below the state
> implementations, it will be great if you can share more technical details
> or some references to me. Thank you!
>
> Best Regards,
> Tony Wei
>
> 2018-02-06 15:24 GMT+08:00 Timo Walther :
>
>> Hi Tony,
>>
>> not having a proper equals() method might work for a keyBy()
>> (partitioning operation) but it can lead to unexpected side effects when
>> dealing with state. If not now, then maybe in the future. For example,
>> heap-based state uses a hash table data structures such that your key might
>> never be found again. I would recommend to wrap your POJO into another
>> class that implements a proper hashCode/equals.
>>
>> Regards,
>> Timo
>>
>>
>> Am 2/6/18 um 4:16 AM schrieb Tony Wei:
>>
>> Hi all,
>>>
>>> I have defined a POJO class that override Object#hashCode and used it in
>>> keyBy().
>>> The pipeline looks good (i.e. no exception that told me it is
>>> UNSUPPORTED key types), but I'm afraid that it will leads to a problem that
>>> elements that I think have the same key will not get the same state because
>>> I didn't override Object#equals.
>>>
>>> Is it necessary that POJO key type overrides Object#equals? Or
>>> PojoTypeInfo didn't rely on MyClass#equals? Or keyBy() didn't rely on
>>> equals?
>>>
>>> Thank you very much.
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>
>>
>>
>
>


Re: Will that be a problem if POJO key type didn't override equals?

2018-02-06 Thread Timo Walther
With heap-based state I meant state that is stored using the 
MemoryStateBackend or FsStateBackend [1]. In general, even if you are 
just using ValueState, the key might be used internally to store your 
value state in hash table.


I think the migration should work in your case. Otherwise feel free to 
let us know.


Regards,
Timo


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html#the-memorystatebackend



Am 2/6/18 um 8:54 AM schrieb Tony Wei:

Hi Timo,

Thanks for your response. I will implement equals for my POJO 
directly. Is that be okay instead of wrap it into another class?
Furthermore, I want to migrate the states from the previous job. Will 
it lead to state lost? I run my job on Flink 1.4.0. I used 
RocksDBStateBackend and only ValueState as key state.


BTW, could you please give more explanations about what heap-based 
state is? Since I'm not familiar with the details below the state 
implementations, it will be great if you can share more technical 
details or some references to me. Thank you!


Best Regards,
Tony Wei

2018-02-06 15:24 GMT+08:00 Timo Walther >:


Hi Tony,

not having a proper equals() method might work for a keyBy()
(partitioning operation) but it can lead to unexpected side
effects when dealing with state. If not now, then maybe in the
future. For example, heap-based state uses a hash table data
structures such that your key might never be found again. I would
recommend to wrap your POJO into another class that implements a
proper hashCode/equals.

Regards,
Timo


Am 2/6/18 um 4:16 AM schrieb Tony Wei:

Hi all,

I have defined a POJO class that override Object#hashCode and
used it in keyBy().
The pipeline looks good (i.e. no exception that told me it is
UNSUPPORTED key types), but I'm afraid that it will leads to a
problem that elements that I think have the same key will not
get the same state because I didn't override Object#equals.

Is it necessary that POJO key type overrides Object#equals? Or
PojoTypeInfo didn't rely on MyClass#equals? Or keyBy() didn't
rely on equals?

Thank you very much.

Best Regards,
Tony Wei








Re: Rebalance to subtasks in same TaskManager instance

2018-02-06 Thread Piotr Nowojski
Hi,

Unfortunately I don’t think it’s currently possible in the Flink. Please feel 
free to submit a feature request for it on our JIRA 
https://issues.apache.org/jira/projects/FLINK/summary 


Have you tried out the setup using rebalance? In most cases overhead of 
rebalance over rescale is not that high as one might think.

Piotrek

> On 5 Feb 2018, at 15:16, johannes.barn...@clarivate.com wrote:
> 
> Hi,
> 
> I have a streaming topology with source parallelism of M and a target
> operator parallelism of N.
> For optimum performance I have found that I need to choose M and N
> independently.
> Also, the source subtasks do not all produce the same number of records and
> therefor I have to rebalance to the target operator to get optimum
> throughput.
> 
> The record sizes vary a lot (up to 10MB) but are about 200kB on average.
> 
> Through experimentation using the rescale() operator I have found that
> maximum throughput can be significantly increased if I restrict this
> rebalancing to target subtasks within the same TaskManager instances.
> 
> However I cannot use rescale for this purpose as it does not do a
> rebalancing to all target subtasks in the instance.
> 
> I was hoping to use a custom Partitioner to achieve this but it is not clear
> to me which partition would correspond to which subTask.
> 
> Is there any way currently to achieve this with Flink? 
> 
> If it helps I believe the feature I am hoping to achieve is similar to
> Storm's "Local or shuffle grouping".
> 
> Any help or suggestions will be appreciated.
> Hans
> 
> 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Reduce parallelism without network transfer.

2018-02-06 Thread Piotr Nowojski
Hi,

Rebalance is more safe default setting that protects against data skew. And 
even the smallest data skew can create a bottleneck much larger then the 
serialisation/network transfer cost. Especially if one changes the parallelism 
to a value that’s not a result of multiplication or division (like N down to 
N-1). And data skew can be arbitrarily large, while rebalance overhead compare 
to rescale is limited.

Piotrek


> On 6 Feb 2018, at 04:32, Kien Truong  wrote:
> 
> Thanks Piotr, it works.
> May I ask why default behavior when reducing parallelism is rebalance, and 
> not rescale ?
> 
> Regards,
> Kien
> 
> Sent from TypeApp 
> On Feb 5, 2018, at 15:28, Piotr Nowojski  > wrote:
> Hi,
> 
> It should work like this out of the box if you use rescale method:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html#physical-partitioning
>  
> 
> 
> If it will not work, please let us know.
> 
> Piotrek 
> 
>> On 3 Feb 2018, at 04:39, Kien Truong < duckientru...@gmail.com 
>> > wrote:
>> 
>> Hi, 
>> 
>> Assuming that I have a streaming job, using 30 task managers with 4 slot 
>> each. I want to change the parallelism of 1 operator from 120 to 30. Are 
>> there anyway so that each subtask of this operator get data from 4 upstream 
>> subtasks running in the same task manager, thus avoiding network completely 
>> ? 
>> 
>> Best regards, 
>> Kien 
>> 
>> Sent from TypeApp