flink????????????????????

2021-12-01 Thread ????


Issues in Batch Jobs Submission for a Session Cluster

2021-12-01 Thread Ghiya, Jay (GE Healthcare)
Hi Flink Team,

Greetings from GE Healthcare team.

Here is a stackoverflow post for the same too posted by fellow dev here : 
https://stackoverflow.com/questions/70068336/flink-job-not-getting-submitted-java-io-ioexception-cannot-allocate-memory

Summary of the post:

Here is the usecase and relevant configuration:

  1.  A flink session cluster in Kubernetes is being utilized to submit batch 
jobs every 1minute. Run time for a batch job is <30 seconds.
  2.  This Flink session cluster is running in HA setup. This means it stores 
job graph and its relevant metadata in flink " /recovery/default/blob/," folder 
for each job that is submitted.
  3.  There is "5 Gb" pvc attached to this session cluster for HA and is based 
out of cephfs. https://docs.ceph.com/en/pacific/cephfs/index.html . Rook is 
used for orchestration.

Ideal Working Scenario:

  1.  Upon a successful job submission, the metadata is created and cleared 
after completion. Average size of blob getting created under recovery folder 
for ha is 150 mb. (Enough space in pvc)

Failure:

  1.  During a long run say for 100 minutes so 100 job submissions , a flink 
job submission will fail stating :


2021-11-22 09:03:11,537 INFO  
org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - Finished 
cleaning up the high availability data for job 6a71a36a3c82d8a9438c9aa9ed6b8993.
2021-11-22 09:03:14,904 ERROR 
org.apache.flink.runtime.blob.BlobServerConnection   [] - PUT operation 
failed
java.io.IOException: Cannot allocate memory
at java.io.FileOutputStream.writeBytes(Native Method) ~[?:1.8.0_312]
at java.io.FileOutputStream.write(FileOutputStream.java:326) ~[?:1.8.0_312]
at 
org.apache.flink.core.fs.local.LocalDataOutputStream.write(LocalDataOutputStream.java:55)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.shaded.guava30.com.google.common.io.ByteStreams.copy(ByteStreams.java:113)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.shaded.guava30.com.google.common.io.ByteSource.copyTo(ByteSource.java:243)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.shaded.guava30.com.google.common.io.Files.copy(Files.java:301) 
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:79)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:72)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:385) 
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.blob.BlobServer.moveTempFileToStore(BlobServer.java:680)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:350)
 [flink-dist_2.11-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:110)
 [flink-dist_2.11-1.14.0.jar:1.14.0]

NOTE that still pvc has not become full. Also the next job submission would 
succeed without complaining about the storage being full. But due to random 
failures eventually the old blobs will pile up in recovery folder making pvc 
full after which all jobs submissions will fail.

Request immediate help here folks. Any pointers to why this behavior. We are 
relying on HA to make sure each job runs fine. This is mission critical data.
Secondly what we would want to see - why stale data is not being cleared off ? 
is it a configuration that we have not done. Still this does not solve our data 
loss problem we experience due to intermittent job submission failures but will 
make sure our pvc is not unnecessary saturated beyond which all jobs fail.

Thankyou in advance.

-Jay
GEHC







Re:Re: 关于streamFileSink在checkpoint下生成文件问题

2021-12-01 Thread 黄志高
|




32684
|
COMPLETED
| 8/8 | 13:52:36 | 13:52:38 | 2s | 126 KB | 0 B |
| | 32683 |
COMPLETED
| 8/8 | 13:42:36 | 13:42:39 | 2s | 126 KB | 0 B |
| | 32682 |
COMPLETED
| 8/8 | 13:32:36 | 13:32:39 | 2s | 126 KB | 0 B |
| | 32681 |
COMPLETED
| 8/8 | 13:22:36 | 13:22:39 | 2s | 125 KB | 0 B |
| | 32680 |
COMPLETED
| 8/8 | 13:12:36 | 13:12:39 | 2s | 125 KB | 0 B |
| | 32679 |
COMPLETED
| 8/8 | 13:02:36 | 13:02:41 | 4s | 214 KB | 0 B |
上图是checkpoint


这个是在11月30号0时段生成的文件
2021-11-30 00:00:011080827 athena_other-0-217891.gz
2021-11-30 00:02:424309209 athena_other-0-217892.gz
2021-11-30 00:12:403902474 athena_other-0-217893.gz
2021-11-30 00:22:403886322 athena_other-0-217894.gz
2021-11-30 00:32:403988037 athena_other-0-217895.gz
2021-11-30 00:42:403892343 athena_other-0-217896.gz
2021-11-30 00:52:392972183 athena_other-0-217897.gz
2021-11-30 00:00:011125774 athena_other-1-219679.gz
2021-11-30 00:02:424338748 athena_other-1-219680.gz
2021-11-30 00:12:404204571 athena_other-1-219681.gz
2021-11-30 00:22:403852791 athena_other-1-219682.gz
2021-11-30 00:32:404025214 athena_other-1-219683.gz
2021-11-30 00:42:404205107 athena_other-1-219684.gz
2021-11-30 00:52:392922192 athena_other-1-219685.gz
2021-11-30 00:00:011103734 athena_other-2-220084.gz


这个是1点生成的文件
2021-11-30 01:00:011228793 athena_other-0-217951.gz
2021-11-30 01:02:424243566 athena_other-0-217952.gz
2021-11-30 01:12:404106305 athena_other-0-217953.gz
2021-11-30 01:22:404456214 athena_other-0-217954.gz
2021-11-30 01:32:414303156 athena_other-0-217955.gz
2021-11-30 01:42:404688872 athena_other-0-217956.gz
2021-11-30 01:52:403251910 athena_other-0-217957.gz
2021-11-30 01:00:011163354 athena_other-1-219736.gz
2021-11-30 01:02:424405233 athena_other-1-219737.gz
2021-11-30 01:12:404094502 athena_other-1-219738.gz
2021-11-30 01:22:404395071 athena_other-1-219739.gz
2021-11-30 01:32:404205169 athena_other-1-219740.gz
2021-11-30 01:42:404432610 athena_other-1-219741.gz
2021-11-30 01:52:403224111 athena_other-1-219742.gz
2021-11-30 01:00:011163964 athena_other-2-220137.gz




之前的截图无法发送,我把文件贴出来,打扰了







在 2021-12-02 13:52:43,"黄志高"  写道:




Hi,我把文件放到下面的,文件在checkpoint可见我是理解的,但是文件的生成时间应该是在checkpoint以后是正常的,但是我却在每个整点时段看见数据文件,如下图所示,按理说文件的生成都是在checkpoint之后的,也就是2分,12,22,32,42,52分后,而每个00分都会生成一个数据文件,不理解这个文件怎么生成的,内部的滚动策略是OnCheckpointRollingPolicy














在 2021-12-02 11:37:31,"Caizhi Weng"  写道:
>Hi!
>
>邮件里看不到图片和附件,建议使用外部图床。
>
>partFile 文件是不是以英文句点开头的?这是因为 streamingFileSink 写文件的时候还没做 checkpoint,为了保证
>exactly once,这些临时写下的 .partFile 文件都是不可见的,需要等 checkpoint 之后才会重命名成可见的文件。
>
>黄志高  于2021年12月1日周三 下午9:53写道:
>
>> hi,各位大佬,咨询个问题
>>
>>  
>> 我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整点时刻生成一个文件,我查阅下源码,没有找到整点触发滚动生成文件的逻辑,有大佬可以帮忙分析一下这个整点时刻生成的文件是怎么来的吗,它属于哪个周期的,附件中是我flink任务的checkpoint时间点,和2021年11月30日在1点和2点生成的文件截图,在1点和2点的00分都生成了一个文件,望大佬帮忙看看
>>
>>
>>
>>





 

ERROR org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - Authentication failed

2021-12-01 Thread summer
在我CDH6.3.2集成Flink1.13.3的时候,在执行flink-sql的时候,在日志中会出现这个报错:


ERROR StatusLogger No Log4j 2 configuration file found. Using default 
configuration (logging only errors to the console), or user 
programmatically provided configurations. Set system property 
'log4j2.debug' to show Log4j 2 internal initialization logging. See 
https://logging.apache.org/log4j/2.x/manual/configuration.html for 
instructions on how to configure Log4j 2
10:45:50.236 [main-EventThread] ERROR 
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - 
Authentication failed
JobManager Web Interface: http://lo-t-work3:8081
The Flink Yarn cluster has failed.
10:56:08.877 [main-EventThread] ERROR 
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - 
Authentication failed


请问这是什么原因造成的?

ERROR org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - Authentication failed

2021-12-01 Thread summer
在我CDH6.3.2集成Flink1.13.3的时候,在执行flink-sql的时候,在日志中会出现这个报错:


ERROR StatusLogger No Log4j 2 configuration file found. Using default 
configuration (logging only errors to the console), or user 
programmatically provided configurations. Set system property 
'log4j2.debug' to show Log4j 2 internal initialization logging. See 
https://logging.apache.org/log4j/2.x/manual/configuration.html for 
instructions on how to configure Log4j 2
10:45:50.236 [main-EventThread] ERROR 
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - 
Authentication failed
JobManager Web Interface: http://lo-t-work3:8081
The Flink Yarn cluster has failed.
10:56:08.877 [main-EventThread] ERROR 
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - 
Authentication failed


请问这是什么原因造成的?

Re:Re: flink remote shuffle example运行出错

2021-12-01 Thread casel.chen
sorry, 应该是我本地切换FLINK_HOME没生效导致的,重新开启一个terminal新环境变量生效后重新操作就正常了,打扰各位了

















在 2021-12-02 10:35:53,"weijie guo"  写道:
>你好,你是在flink standalone模式下提交的作业吗,另外用的flink是官网download的Apache Flink 1.14.0
>for Scala 2.12
>
>吗
>casel.chen  于2021年12月2日周四 上午8:12写道:
>
>> 按照 https://github.com/flink-extended/flink-remote-shuffle 上的指南试着运行flink
>> remote shuffle服务跑一个batch作业,结果报错如下。我本地使用的是scala 2.12
>> 因此编译打包flink-remote-shuffle的时候使用的命令是:mvn clean install -DskipTests
>> -Dscala.binary.version=2.12
>>
>> 报的这个找不到的类是flink-streaming-java_2.12-1.14.0.jar下的,我将该jar包放在$FLINK_HOME/lib目录下也没有作用。本地flink版本是1.14.0
>>
>>
>>
>> java.lang.NoClassDefFoundError:
>> org/apache/flink/streaming/api/graph/GlobalStreamExchangeMode
>>
>> at
>> com.alibaba.flink.shuffle.examples.BatchJobDemo.main(BatchJobDemo.java:51)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:498)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>>
>> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>>
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>>
>> at
>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>>
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>>
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode
>>
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>
>> at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>>
>> at
>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
>>
>> at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>
>> ... 14 more


Re: 关于streamFileSink在checkpoint下生成文件问题

2021-12-01 Thread Caizhi Weng
Hi!

邮件里看不到图片和附件,建议使用外部图床。

partFile 文件是不是以英文句点开头的?这是因为 streamingFileSink 写文件的时候还没做 checkpoint,为了保证
exactly once,这些临时写下的 .partFile 文件都是不可见的,需要等 checkpoint 之后才会重命名成可见的文件。

黄志高  于2021年12月1日周三 下午9:53写道:

> hi,各位大佬,咨询个问题
>
>  
> 我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整点时刻生成一个文件,我查阅下源码,没有找到整点触发滚动生成文件的逻辑,有大佬可以帮忙分析一下这个整点时刻生成的文件是怎么来的吗,它属于哪个周期的,附件中是我flink任务的checkpoint时间点,和2021年11月30日在1点和2点生成的文件截图,在1点和2点的00分都生成了一个文件,望大佬帮忙看看
>
>
>
>


Re: 谁能解释一下 GlobalStreamExchangeMode 这几种交换模式的不同和使用场景吗?

2021-12-01 Thread Yingjie Cao
这个是可以直接控制内部连边的方式,可以参考一下这个的Java doc。不过这个是一个内部接口,还是建议使用
env.setRuntimeMode(RuntimeExecutionMode.BATCH),这个可以参考一下这个文档:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/execution_mode/
。

public enum GlobalStreamExchangeMode {
/** Set all job edges to be {@link ResultPartitionType#BLOCKING}. */
ALL_EDGES_BLOCKING,

/**
 * Set job edges with {@link ForwardPartitioner} to be {@link
 * ResultPartitionType#PIPELINED_BOUNDED} and other edges to be {@link
 * ResultPartitionType#BLOCKING}.
 */
FORWARD_EDGES_PIPELINED,

/**
 * Set job edges with {@link ForwardPartitioner} or {@link
RescalePartitioner} to be {@link
 * ResultPartitionType#PIPELINED_BOUNDED} and other edges to be {@link
 * ResultPartitionType#BLOCKING}.
 */
POINTWISE_EDGES_PIPELINED,

/** Set all job edges {@link ResultPartitionType#PIPELINED_BOUNDED}. */
ALL_EDGES_PIPELINED,

/** Set all job edges {@link ResultPartitionType#PIPELINED_APPROXIMATE}. */
ALL_EDGES_PIPELINED_APPROXIMATE
}


casel.chen  于2021年12月2日周四 上午8:26写道:

> GlobalStreamExchangeMode 这几种交换模式的不同和使用场景是什么?哪些适合流式作业,哪些适合批式作业?
> Flink Remote Shuffle Service的推出是不是意味着可以在生产环境使用Flink处理批式作业?谢谢!
>
>
> package org.apache.flink.streaming.api.graph;
>
>
>
>
> import org.apache.flink.annotation.Internal;
>
>
>
>
> @Internal
>
> public enum GlobalStreamExchangeMode {
>
> ALL_EDGES_BLOCKING,
>
> FORWARD_EDGES_PIPELINED,
>
> POINTWISE_EDGES_PIPELINED,
>
> ALL_EDGES_PIPELINED,
>
> ALL_EDGES_PIPELINED_APPROXIMATE;
>
>
>
>
> private GlobalStreamExchangeMode() {
>
> }
>
> }
>
>
>


Re: flink remote shuffle example运行出错

2021-12-01 Thread Yingjie Cao
看起来flink-streaming-java应该不在class
path下面,如果是官网下的flink,直接在FLINK_HOME下执行./bin/flink
run提交job应该就不会出错。另外这个错误我理解本身和remote shuffle应该关系不大,不用remote
shuffle,应该也会抛。建议是从官网下载一个Flink版本再试一下。


Re: flink remote shuffle example运行出错

2021-12-01 Thread weijie guo
你好,你是在flink standalone模式下提交的作业吗,另外用的flink是官网download的Apache Flink 1.14.0
for Scala 2.12

吗
casel.chen  于2021年12月2日周四 上午8:12写道:

> 按照 https://github.com/flink-extended/flink-remote-shuffle 上的指南试着运行flink
> remote shuffle服务跑一个batch作业,结果报错如下。我本地使用的是scala 2.12
> 因此编译打包flink-remote-shuffle的时候使用的命令是:mvn clean install -DskipTests
> -Dscala.binary.version=2.12
>
> 报的这个找不到的类是flink-streaming-java_2.12-1.14.0.jar下的,我将该jar包放在$FLINK_HOME/lib目录下也没有作用。本地flink版本是1.14.0
>
>
>
> java.lang.NoClassDefFoundError:
> org/apache/flink/streaming/api/graph/GlobalStreamExchangeMode
>
> at
> com.alibaba.flink.shuffle.examples.BatchJobDemo.main(BatchJobDemo.java:51)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>
> at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
>
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>
> ... 14 more


Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-12-01 Thread Hang Ruan
Sorry, I spell it wrong, which I mean the PR. Here it is
https://github.com/apache/flink/pull/17276 .

Marco Villalobos  于2021年12月1日周三 下午9:18写道:

> Thank you. One last question.  What is an RP? Where can I read it?
>
> Marco
>
> On Nov 30, 2021, at 11:06 PM, Hang Ruan  wrote:
>
> Hi,
>
> In 1.12.0-1.12.5 versions, committing offset to kafka when the checkpoint
> is open is the default behavior in KafkaSourceBuilder. And it can not be
> changed in KafkaSourceBuilder.
>
> By this FLINK-24277 ,
> we could change the behavior. This problem will be fixed in 1.12.6. It
> seems not to be contained in your version.
>
> Reading the RP will be helpful for you to understand the behavior.
>
>
> Marco Villalobos  于2021年12月1日周三 上午3:43写道:
>
>> Thanks!
>>
>> However, I noticed that KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT
>> does not exist in Flink 1.12.
>>
>> Is that property supported with the string
>> "commit.offsets.on.checkpoints"?
>>
>> How do I configure that behavior so that offsets get committed on
>> checkpoints in Flink 1.12 when using the KafkaSourceBuilder? Or is that the
>> default behavior with checkpoints?
>>
>>
>>
>>
>> On Mon, Nov 29, 2021 at 7:46 PM Hang Ruan  wrote:
>>
>>> Hi,
>>>
>>> Maybe you can write like this :
>>> builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(),
>>> "true");
>>>
>>> Other additional properties could be found here :
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties
>>>
>>> Marco Villalobos  于2021年11月30日周二 上午11:08写道:
>>>
 Thank you for the information.  That still does not answer my question
 though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so
 that consumer should commit offsets back to Kafka on checkpoints?

 FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this
 method.

 But now that I am using KafkaSourceBuilder, how do I configure that
 behavior so that offsets get committed on checkpoints?  Or is that the
 default behavior with checkpoints?

 -Marco

 On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng 
 wrote:

> Hi!
>
> Flink 1.14 release note states about this. See [1].
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>
> Marco Villalobos  于2021年11月30日周二 上午7:12写道:
>
>> Hi everybody,
>>
>> I am using Flink 1.12 and migrating my code from using
>> FlinkKafkaConsumer to using the KafkaSourceBuilder.
>>
>> FlinkKafkaConsumer has the method
>>
>> /**
>>>  * Specifies whether or not the consumer should commit offsets back
>>> to Kafka on checkpoints.
>>>  * This setting will only have effect if checkpointing is enabled
>>> for the job. If checkpointing isn't
>>>  * enabled, only the "auto.commit.enable" (for 0.8) /
>>> "enable.auto.commit" (for 0.9+) property
>>>  * settings will be used.
>>> */
>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>>
>>
>> How do I setup that parameter when using the KafkaSourceBuilder? If I
>> already have checkpointing configured, is it necessary to setup "commit
>> offsets on checkpoints"?
>>
>> The Flink 1.12 documentation does not discuss this topic, and the
>> Flink 1.14 documentation says little about it.
>>
>>  For example, the Flink 1.14 documentation states:
>>
>> Additional Properties
>>> In addition to properties described above, you can set arbitrary
>>> properties for KafkaSource and KafkaConsumer by using
>>> setProperties(Properties) and setProperty(String, String). KafkaSource 
>>> has
>>> following options for configuration:
>>> commit.offsets.on.checkpoint specifies whether to commit consuming
>>> offsets to Kafka brokers on checkpoint
>>
>>
>> And the 1.12 documentation states:
>>
>> With Flink’s checkpointing enabled, the Flink Kafka Consumer will
>>> consume records from a topic and periodically checkpoint all its Kafka
>>> offsets, together with the state of other operations. In case of a job
>>> failure, Flink will restore the streaming program to the state of the
>>> latest checkpoint and re-consume the records from Kafka, starting from 
>>> the
>>> offsets that were stored in the checkpoint.
>>> The interval of drawing checkpoints therefore defines how much the
>>> program may have to go back at most, in case of a failure. To use fault
>>> tolerant Kafka Consumers, checkpointing of the topology needs to be 
>>> enabled
>>> in the job.
>>> If checkpointing is disabled, the Kafka consumer will periodically
>>> commit the offsets to Zookeeper.
>>
>>
>> Thank you.
>>
>> Marco
>>
>>
>>
>


Re: flink sql collect函数使用问题

2021-12-01 Thread cyril cui
af里acc为个list,merge的时候合并,输出的时候 list拼成string即可

casel.chen  于2021年12月2日周四 上午9:46写道:

> 使用场景如下,将kafka源表通过flink sql处理成mongodb汇表存入。按照班级进行group
> by,输出对应班级所有的学生数据集合。请问用flink sql自带的collect函数能实现吗?如果能的话要怎么写sql?
> 如果不能的话要怎么写UDAF,有例子参考吗?谢谢!
>
> kafka源表:
> 班级 学号  姓名  年龄
> 1 20001张三   15
> 2 20011李四   16
> 1 20002王五   16
> 2 20012吴六   15
>
> create table source_table (
>class_no: INT,
>student_no: INT,
>name: STRING,
>age: INT
> ) with (
>'connector' = 'kafka',
>...
> );
>
>
>
> 通过flink sql处理输出 ==>
>
>
> mongodb目标表:
> 班级 学生信息
> 1 [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no":
> 20002, "name":"王五", "age": 16}]
> 2 [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no":
> 20012, "name":"吴六", "age": 15}]
>
> create table sink_table (
>   class_no INT,
>   students: ARRAY>
> ) with (
>   'connector' = 'mongodb',
>   ...
> );
>
>


flink sql collect函数使用问题

2021-12-01 Thread casel.chen
使用场景如下,将kafka源表通过flink sql处理成mongodb汇表存入。按照班级进行group 
by,输出对应班级所有的学生数据集合。请问用flink sql自带的collect函数能实现吗?如果能的话要怎么写sql? 
如果不能的话要怎么写UDAF,有例子参考吗?谢谢!

kafka源表:
班级 学号  姓名  年龄
1 20001张三   15
2 20011李四   16
1 20002王五   16
2 20012吴六   15

create table source_table (
   class_no: INT,
   student_no: INT,
   name: STRING,
   age: INT
) with (
   'connector' = 'kafka',
   ...
);



通过flink sql处理输出 ==>


mongodb目标表:
班级 学生信息
1 [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no": 20002, 
"name":"王五", "age": 16}]
2 [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no": 20012, 
"name":"吴六", "age": 15}]

create table sink_table (
  class_no INT,
  students: ARRAY>
) with (
  'connector' = 'mongodb',
  ...
);



????

2021-12-01 Thread zdj


谁能解释一下 GlobalStreamExchangeMode 这几种交换模式的不同和使用场景吗?

2021-12-01 Thread casel.chen
GlobalStreamExchangeMode 这几种交换模式的不同和使用场景是什么?哪些适合流式作业,哪些适合批式作业?
Flink Remote Shuffle Service的推出是不是意味着可以在生产环境使用Flink处理批式作业?谢谢!


package org.apache.flink.streaming.api.graph;




import org.apache.flink.annotation.Internal;




@Internal

public enum GlobalStreamExchangeMode {

ALL_EDGES_BLOCKING,

FORWARD_EDGES_PIPELINED,

POINTWISE_EDGES_PIPELINED,

ALL_EDGES_PIPELINED,

ALL_EDGES_PIPELINED_APPROXIMATE;




private GlobalStreamExchangeMode() {

}

}




flink remote shuffle example运行出错

2021-12-01 Thread casel.chen
按照 https://github.com/flink-extended/flink-remote-shuffle 上的指南试着运行flink remote 
shuffle服务跑一个batch作业,结果报错如下。我本地使用的是scala 2.12 
因此编译打包flink-remote-shuffle的时候使用的命令是:mvn clean install -DskipTests 
-Dscala.binary.version=2.12
报的这个找不到的类是flink-streaming-java_2.12-1.14.0.jar下的,我将该jar包放在$FLINK_HOME/lib目录下也没有作用。本地flink版本是1.14.0



java.lang.NoClassDefFoundError: 
org/apache/flink/streaming/api/graph/GlobalStreamExchangeMode

at com.alibaba.flink.shuffle.examples.BatchJobDemo.main(BatchJobDemo.java:51)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)

at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode

at java.net.URLClassLoader.findClass(URLClassLoader.java:382)

at java.lang.ClassLoader.loadClass(ClassLoader.java:418)

at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)

at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)

at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)

at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

... 14 more

Re: [DISCUSS] Deprecate Java 8 support

2021-12-01 Thread Chesnay Schepler
Flink can be built with Java 11 since 1.10. If I recall correctly we 
solved the tools.jar issue, which Hadoop depends on, by excluding that 
dependency. As far as we could tell it's not actually required.


On 01/12/2021 19:56, Nicolás Ferrario wrote:
Hi all, this would be awesome, I'm so tired of seeing Java 8 
everywhere (reminds me of Python 2.7).


We're currently building our code against Java 11 because that's the 
latest version of Java available as a Flink Docker image, but it'd be 
great to use newer versions. I think it would also help to clean up 
dependencies and hopefully no longer have incompatibility issues.
For example, right now it's not possible to build Flink with Java 9+ 
because of a Maven dependency. Using JDK 8 or copying "tools.jar" to 
any newer JDK version fixes it (see more: 
https://stackoverflow.com/questions/53707666/how-to-get-tools-jar-for-openjdk-11-on-windows).


Official support for Java 17 would be great.

Greetings!

On Wed, Dec 1, 2021 at 7:51 AM Chesnay Schepler  
wrote:


Hello Gavin,

If you run into any issues with Java 17, please report them in
FLINK-15736 .
I recently did some experiments with Java 17 myself; I would think
that you will run into some blockers (like ASM requiring an
upgrade , or
missing --add-opens/--add-exports
).

On 01/12/2021 11:12, Gavin Lee wrote:

Thanks for sharing this info with us Chesnay.
We've been using Flink for 5 years,  and upgraded to 1.13.2
months ago. The java version is still 8.
Currently we're testing with java 17 in our staging environment.
There are no special concerns.
Will update when tests complete.

On Tue, Nov 30, 2021 at 1:18 AM Chesnay Schepler
 wrote:

Hello,

we recently had a discussion on the dev mailing list for
deprecating support for Java 8 in 1.15, with a general
consensus in favor of it.

I now wanted to check in with you, our users, to see what you
have got to say about that.


Why are we interested in deprecating Java 8 support
now (and in eventually removing it)?

The main reason is that supporting the recently released Java
17 (and subsequent versions), while maintaining Java 8 support,
will be more complicated than if Java 11 were the oldest
release version. Essentially because Java 11/17 have both
crossed the Java 9 chasm.

We will still have to bite this bullet in any case (because
Java 17 is out /now /but we are /not /dropping Java 8 /now/),
but we would still
like to signal that users should upgrade to Java 11 so that
we can /eventually/ clean this up.

Furthermore, it is currently hard to justify investing time
into benchmarks/performance improvements that are specific to
Java 11+, because
they provide no benefit to Java 8.


What does the deprecation mean exactly?

It will primarily mean that a warning will be logged when you
run Flink on Java 8.
We /may/ change the default Java version of the Docker images
to Java 11 (the java8 tags will remain),
and we will put a larger emphasis on Flink's performance on
Java 11.


Does that mean that Java 8 support will be removed in
1.16/1.17?

No. We are not putting a hard-date on the removal of Java 8
support at this time.


Will this mean that at some point we'll surprise you
with the removal of Java 8 support in the next release?

No. We will announce the removal ahead of time by /at least/
half a year / 2+ releases (probably closer to a full year).


Is the deprecation already decided?

No. The responses in this thread are integral for deciding
whether a deprecation at this time makes sense.


If you are still using Java 8 at the moment, then we would
appreciate if you could tell us whether you already have a
time-frame for
when you intend to upgrade to Java 11. We'd also be
interested in anything that blocks your migration to Java 11.


Please raise concerns you have, and feel free to ask questions.



-- 
Gavin





Re: [DISCUSS] Deprecate Java 8 support

2021-12-01 Thread Nicolás Ferrario
Hi all, this would be awesome, I'm so tired of seeing Java 8 everywhere
(reminds me of Python 2.7).

We're currently building our code against Java 11 because that's the latest
version of Java available as a Flink Docker image, but it'd be great to use
newer versions. I think it would also help to clean up dependencies and
hopefully no longer have incompatibility issues.
For example, right now it's not possible to build Flink with Java 9+
because of a Maven dependency. Using JDK 8 or copying "tools.jar" to any
newer JDK version fixes it (see more:
https://stackoverflow.com/questions/53707666/how-to-get-tools-jar-for-openjdk-11-on-windows
).

Official support for Java 17 would be great.

Greetings!

On Wed, Dec 1, 2021 at 7:51 AM Chesnay Schepler  wrote:

> Hello Gavin,
>
> If you run into any issues with Java 17, please report them in FLINK-15736
> .
> I recently did some experiments with Java 17 myself; I would think that
> you will run into some blockers (like ASM requiring an upgrade
> , or missing
> --add-opens/--add-exports
> ).
>
> On 01/12/2021 11:12, Gavin Lee wrote:
>
> Thanks for sharing this info with us Chesnay.
> We've been using Flink for 5 years,  and upgraded to 1.13.2 months ago.
> The java version is still 8.
> Currently we're testing with java 17 in our staging environment. There are
> no special concerns.
> Will update when tests complete.
>
>
> On Tue, Nov 30, 2021 at 1:18 AM Chesnay Schepler 
> wrote:
>
>> Hello,
>>
>> we recently had a discussion on the dev mailing list for deprecating
>> support for Java 8 in 1.15, with a general consensus in favor of it.
>>
>> I now wanted to check in with you, our users, to see what you have got to
>> say about that.
>>
>> Why are we interested in deprecating Java 8 support now (and in
>> eventually removing it)?
>>
>> The main reason is that supporting the recently released Java 17 (and
>> subsequent versions), while maintaining Java 8 support,
>> will be more complicated than if Java 11 were the oldest release version.
>> Essentially because Java 11/17 have both crossed the Java 9 chasm.
>>
>> We will still have to bite this bullet in any case (because Java 17 is
>> out *now *but we are *not *dropping Java 8 *now*), but we would still
>> like to signal that users should upgrade to Java 11 so that we can
>> *eventually* clean this up.
>>
>> Furthermore, it is currently hard to justify investing time into
>> benchmarks/performance improvements that are specific to Java 11+, because
>> they provide no benefit to Java 8.
>> What does the deprecation mean exactly?
>>
>> It will primarily mean that a warning will be logged when you run Flink
>> on Java 8.
>> We *may* change the default Java version of the Docker images to Java 11
>> (the java8 tags will remain),
>> and we will put a larger emphasis on Flink's performance on Java 11.
>> Does that mean that Java 8 support will be removed in 1.16/1.17?
>>
>> No. We are not putting a hard-date on the removal of Java 8 support at
>> this time.
>> Will this mean that at some point we'll surprise you with the removal of
>> Java 8 support in the next release?
>>
>> No. We will announce the removal ahead of time by *at least* half a year
>> / 2+ releases (probably closer to a full year).
>> Is the deprecation already decided?
>>
>> No. The responses in this thread are integral for deciding whether a
>> deprecation at this time makes sense.
>>
>>
>> If you are still using Java 8 at the moment, then we would appreciate if
>> you could tell us whether you already have a time-frame for
>> when you intend to upgrade to Java 11. We'd also be interested in
>> anything that blocks your migration to Java 11.
>>
>>
>> Please raise concerns you have, and feel free to ask questions.
>>
>
>
> --
> Gavin
>
>
>


Re: REST API for detached minicluster based integration test

2021-12-01 Thread Jin Yi
so i went ahead and put some logging in the WatermarkGeneartor.onEvent and
.onPeriodicEmit functions in the test source watermark generator, and i do
see the watermarks come by with values through those functions.  they're
just not being returned as expected via the rest api.

On Tue, Nov 30, 2021 at 7:24 PM Caizhi Weng  wrote:

> Hi!
>
> I see. So to test your watermark strategy you would like to fetch the
> watermarks downstream.
>
> I would suggest taking a look at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator. This
> class has a processWatermark method, which is called when a watermark
> flows through this operator. You can make your own testing operator by
> extending this class and stuff the testing operator in a
> org.apache.flink.streaming.api.transformations.OneInputTransformation. In
> this case you do not need to fetch watermarks from the metrics. If
> processWatermark is never called then it means no watermark ever comes
> and you might want to check your watermark strategy implementation.
>
> Jin Yi  于2021年12月1日周三 上午4:14写道:
>
>> thanks for the reply caizhi!
>>
>> we're on flink 1.12.3.  in the test, i'm using a custom watermark
>> strategy that is derived from BoundedOutOfOrdernessWatermarks that emits
>> watermarks using processing time after a period of no events to keep the
>> timer-reliant operators happy.  basically, it's using event time for
>> everything, but the inputs have watermarks periodically output if there's
>> no events coming in through them.
>>
>> we started w/ test data w/ their own event times in the tests and simply
>> used the SEE.fromCollection with a timestamp assigner that extracts the
>> timestamp from the test event data.  however, doing things this way, the
>> minicluster during the test terminates (and completes the test) once all
>> the input is processed, even though there are timers in the operators that
>> are meant to supply additional output still outstanding.  so, that's why i
>> cobbled together an attempt to control when the input sources are complete
>> by using the posted WaitingSourceFunction to send the signal to
>> close/cancel the input stream based on some form of state checking on the
>> job (which is where this thread starts).
>>
>> what's the best way to achieve what i need?  i would love to set the
>> inputs up so that watermarks get emitted appropriately throughout the
>> processing of the test data as well as for a defined period after all the
>> "input" is complete such that the timer-reliant operators get their timers
>> triggered.  thanks!
>>
>> On Mon, Nov 29, 2021 at 6:37 PM Caizhi Weng  wrote:
>>
>>> Hi!
>>>
>>> I believe metrics are enabled by default even for a mini cluster. Which
>>> Flink version are you using and how do you set your watermark strategy?
>>> Could you share your user code about how to create the datastream / SQL and
>>> get the job graph?
>>>
>>> I'm also curious about why do you need to extract the output watermarks
>>> just for stopping the source. You can control the records and the watermark
>>> strategy from the source. From my point of view, constructing some test
>>> data with some specific row time would be enough.
>>>
>>> Jin Yi  于2021年11月30日周二 上午3:34写道:
>>>
 bump.  a more general question is what do people do for more end to
 end, full integration tests to test event time based jobs with timers?

 On Tue, Nov 23, 2021 at 11:26 AM Jin Yi  wrote:

> i am writing an integration test where i execute a streaming flink job
> using faked, "unbounded" input where i want to control when the source
> function(s) complete by triggering them once the job's operator's maximum
> output watermarks are beyond some job completion watermark that's relative
> to the maximum input timestamp because the flink job uses event time 
> timers
> to produce some output.
>
> here is the faked, "unbounded" source function class:
>
>   private static class WaitingSourceFunction extends
> FromElementsFunction {
>
> private boolean isWaiting;
>
> private TypeInformation typeInfo;
>
>
> private WaitingSourceFunction(
>
> StreamExecutionEnvironment env, Collection data,
> TypeInformation typeInfo)
>
> throws IOException {
>
>   super(typeInfo.createSerializer(env.getConfig()), data);
>
>   this.isWaiting = true;
>
>   this.typeInfo = typeInfo;
>
> }
>
>
> @Override
>
> public void cancel() {
>
>   super.cancel();
>
>   isWaiting = false;
>
> }
>
>
> @Override
>
> public void run(SourceContext ctx) throws Exception {
>
>   super.run(ctx);
>
>   while (isWaiting) {
>
> TimeUnit.SECONDS.sleep(10);
>
>   }
>
> }
>
>
> public long getEndWatermark() {
>
>   

Re:Re: flink访问多个oss bucket问题

2021-12-01 Thread casel.chen









fs.oss.credentials.provider可以指定两个不同的provider吗?
如何区分是写数据的provider,还是做checkpoint/savepoint用的provider呢?










在 2021-12-01 10:58:18,"Caizhi Weng"  写道:
>Hi!
>
>如果只是 bucket 不同的话,通过在 with 参数里指定 path 即可。
>
>如果连 ak id 和 secret
>都不同,可以考虑实现自己的 com.aliyun.oss.common.auth.CredentialsProvider 接口,并在 flink
>conf 中指定 fs.oss.credentials.provider 为对应的实现类。
>
>casel.chen  于2021年12月1日周三 上午8:14写道:
>
>> flink平台作业写数据到客户oss bucket,和flink平台作业本身做checkpoint/savepoint用的oss
>> bucket不是同一个。
>> 请问这种场景flink是否支持,如果支持的话应该要怎么配置?谢谢!


flink sql group by后收集数据集合问题

2021-12-01 Thread casel.chen
业务中使用flink sql group by操作后想收集每个分组下所有的数据,如下示例:


kafka源表:
班级 学号  姓名  年龄
1 20001张三   15
2 20011李四   16
1 20002王五   16
2 20012吴六   15


create table source_table (
   class_no: INT,
   student_no: INT,
   name: STRING,
   age: INT
) with (
   'connector' = 'kafka',
   ...
);


mongodb目标表:
班级 学生信息
1 [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no": 20002, 
"name":"王五", "age": 16}]
2 [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no": 20012, 
"name":"吴六", "age": 15}]


create table sink_table (
  class_no INT,
  students: ARRAY>
) with (
  'connector' = 'mongodb',
  ...
);


查了下flink自带的系统函数,接近满足条件的只有collect函数。
insert into sink_table select class_no, collect(ROW(student_no, name, age) from 
source_table group by class_no;


但它返回的是Multiset类型,即Map。如果key的类型是ROW,像我这种场景,直接写mongodb会抛错,因为它会自动强制将key的类型转成STRING。
何况这里我只想收集Array[ROW],相当于只要Map中的keyset,即去重后的Array。


三个问题: 
1. 如果要收集去重的Array[ROW],有什么办法可以做到吗?想尽量做得通用,收集的数据类型是ROW,而不只是case by 
case的针对特定类型的udf,如这里的ROW
2. 如果要收集不去重的Array[ROW],又该怎么写?
3. 访问一个数据类型为Map的数据中key和value,要用什么flink sql语法?


谢谢解答!

关于streamFileSink在checkpoint下生成文件问题

2021-12-01 Thread 黄志高
hi,各位大佬,咨询个问题
   
我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整点时刻生成一个文件,我查阅下源码,没有找到整点触发滚动生成文件的逻辑,有大佬可以帮忙分析一下这个整点时刻生成的文件是怎么来的吗,它属于哪个周期的,附件中是我flink任务的checkpoint时间点,和2021年11月30日在1点和2点生成的文件截图,在1点和2点的00分都生成了一个文件,望大佬帮忙看看

Re: flink sql lookup join中维表不可以是视图吗?

2021-12-01 Thread Tony Wei
Hi,

如果兩次 left join 的話是否滿足你的需求呢?
然後在取 temporal table 的字段時,用 IF 去判斷取值。參考 SQL 如下

SELECT
  c.mer_cust_id,
  *IF(k.mer_cust_id IS NOT NULL AND a.mercust_id IS NOT NULL AND
k.mer_cust_id <> '', k.update_time, NULL) AS update_time*
FROM charge_log as c
  LEFT JOIN ka_mer_info FOR SYSTEM_TIME AS OF c.proc_time AS k
ON c.mer_cust_id = k.mer_cust_id
  LEFT JOIN adp_mer_user_info FOR SYSTEM_TIME AS OF c.proc_time AS a
ON c.mer_cust_id = a.mer_cust_id

不過,這種寫法只能適用在兩張 MySQL 表都保證 mer_cust_id 是唯一主鍵的狀況下。如果 mer_cust_id
不是唯一的話,輸出的結果數量會跟原本提供的 SQL 期望的輸出不一致
比如說 ka_mer_info 有 0 筆數據, adp_mer_user_info 有 2 筆數據,原先的 SQL 會得到 1 筆 left
join 沒成功的數據,上面提供的 SQL 則會輸出 2 筆。


casel.chen  於 2021年12月1日 週三 下午6:33寫道:

> lookup join用的维表需要从两张mysql表做关联后得到,因此创建了一个视图。但发现flink sql不支持lookup
> join关联视图,会抛
> Temporal Table Join requires primary key in versioned table, but no
> primary key can be found.
>
>
> 请问这种情况要怎么解决?
>
>
> CREATE VIEW query_mer_view (mer_cust_id, update_time) AS
> SELECT a.mer_cust_id, k.update_time
> FROM ka_mer_info k INNER JOIN adp_mer_user_info a on k.mer_cust_id =
> a.mer_cust_id
> where k.mer_cust_id <> '';
>
>
> SELECT
> DATE_FORMAT(c.create_time, '-MM-dd') AS payment_date,
> c.mer_cust_id,
>
>
> c.trans_amt,
> CASE c.trans_stat WHEN 'S' THEN c.trans_amt ELSE 0 END as
> succ_amt ,
>
>
> 1 as trans_cnt,
> CASE c.trans_stat WHEN 'S' THEN 1 ELSE 0  END as succ_cnt ,
> CASE c.trans_stat WHEN 'F' THEN 1 ELSE 0  END as fail_cnt
>
>
> FROM charge_log as c
>  LEFT JOIN query_mer_view FOR SYSTEM_TIME AS OF
> c.proc_time AS q
>  ON c.mer_cust_id = q.mer_cust_id;


Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-12-01 Thread Marco Villalobos
Thank you. One last question.  What is an RP? Where can I read it?

Marco

> On Nov 30, 2021, at 11:06 PM, Hang Ruan  wrote:
> 
> Hi,
> 
> In 1.12.0-1.12.5 versions, committing offset to kafka when the checkpoint is 
> open is the default behavior in KafkaSourceBuilder. And it can not be changed 
> in KafkaSourceBuilder. 
> 
> By this FLINK-24277 , we 
> could change the behavior. This problem will be fixed in 1.12.6. It seems not 
> to be contained in your version.  
> 
> Reading the RP will be helpful for you to understand the behavior.
>  
> 
> Marco Villalobos  > 于2021年12月1日周三 上午3:43写道:
> Thanks! 
> 
> However, I noticed that KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT does 
> not exist in Flink 1.12.
> 
> Is that property supported with the string "commit.offsets.on.checkpoints"?
> 
> How do I configure that behavior so that offsets get committed on checkpoints 
> in Flink 1.12 when using the KafkaSourceBuilder? Or is that the default 
> behavior with checkpoints?
> 
> 
> 
> 
> On Mon, Nov 29, 2021 at 7:46 PM Hang Ruan  > wrote:
> Hi, 
> 
> Maybe you can write like this : 
> builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), 
> "true");
> 
> Other additional properties could be found here : 
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties
>  
> 
> Marco Villalobos  > 于2021年11月30日周二 上午11:08写道:
> Thank you for the information.  That still does not answer my question 
> though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so 
> that consumer should commit offsets back to Kafka on checkpoints?
> 
> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this method. 
> 
> But now that I am using KafkaSourceBuilder, how do I configure that behavior 
> so that offsets get committed on checkpoints?  Or is that the default 
> behavior with checkpoints?
> 
> -Marco
> 
> On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng  > wrote:
> Hi!
> 
> Flink 1.14 release note states about this. See [1].
> 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>  
> 
> Marco Villalobos  > 于2021年11月30日周二 上午7:12写道:
> Hi everybody,
> 
> I am using Flink 1.12 and migrating my code from using FlinkKafkaConsumer to 
> using the KafkaSourceBuilder.
> 
> FlinkKafkaConsumer has the method 
> 
> /**
>  * Specifies whether or not the consumer should commit offsets back to Kafka 
> on checkpoints.
>  * This setting will only have effect if checkpointing is enabled for the 
> job. If checkpointing isn't
>  * enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit" 
> (for 0.9+) property
>  * settings will be used.
> */
> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
> 
> How do I setup that parameter when using the KafkaSourceBuilder? If I already 
> have checkpointing configured, is it necessary to setup "commit offsets on 
> checkpoints"?
> 
> The Flink 1.12 documentation does not discuss this topic, and the Flink 1.14 
> documentation says little about it.
> 
>  For example, the Flink 1.14 documentation states:
> 
> Additional Properties
> In addition to properties described above, you can set arbitrary properties 
> for KafkaSource and KafkaConsumer by using setProperties(Properties) and 
> setProperty(String, String). KafkaSource has following options for 
> configuration:
> commit.offsets.on.checkpoint specifies whether to commit consuming offsets to 
> Kafka brokers on checkpoint
> 
> And the 1.12 documentation states:
> 
> With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume 
> records from a topic and periodically checkpoint all its Kafka offsets, 
> together with the state of other operations. In case of a job failure, Flink 
> will restore the streaming program to the state of the latest checkpoint and 
> re-consume the records from Kafka, starting from the offsets that were stored 
> in the checkpoint.
> The interval of drawing checkpoints therefore defines how much the program 
> may have to go back at most, in case of a failure. To use fault tolerant 
> Kafka Consumers, checkpointing of the topology needs to be enabled in the job.
> If checkpointing is disabled, the Kafka consumer will periodically commit the 
> offsets to Zookeeper.
> 
> Thank you.
> 
> Marco
> 
> 



Re: [DISCUSS] Deprecate Java 8 support

2021-12-01 Thread Chesnay Schepler

Hello Gavin,

If you run into any issues with Java 17, please report them in 
FLINK-15736 .
I recently did some experiments with Java 17 myself; I would think that 
you will run into some blockers (like ASM requiring an upgrade 
, or missing 
--add-opens/--add-exports 
).


On 01/12/2021 11:12, Gavin Lee wrote:

Thanks for sharing this info with us Chesnay.
We've been using Flink for 5 years,  and upgraded to 1.13.2 months 
ago. The java version is still 8.
Currently we're testing with java 17 in our staging environment. There 
are no special concerns.

Will update when tests complete.

On Tue, Nov 30, 2021 at 1:18 AM Chesnay Schepler  
wrote:


Hello,

we recently had a discussion on the dev mailing list for
deprecating support for Java 8 in 1.15, with a general consensus
in favor of it.

I now wanted to check in with you, our users, to see what you have
got to say about that.


Why are we interested in deprecating Java 8 support now
(and in eventually removing it)?

The main reason is that supporting the recently released Java 17
(and subsequent versions), while maintaining Java 8 support,
will be more complicated than if Java 11 were the oldest release
version. Essentially because Java 11/17 have both crossed the Java
9 chasm.

We will still have to bite this bullet in any case (because Java
17 is out /now /but we are /not /dropping Java 8 /now/), but we
would still
like to signal that users should upgrade to Java 11 so that we can
/eventually/ clean this up.

Furthermore, it is currently hard to justify investing time into
benchmarks/performance improvements that are specific to Java 11+,
because
they provide no benefit to Java 8.


What does the deprecation mean exactly?

It will primarily mean that a warning will be logged when you run
Flink on Java 8.
We /may/ change the default Java version of the Docker images to
Java 11 (the java8 tags will remain),
and we will put a larger emphasis on Flink's performance on Java 11.


Does that mean that Java 8 support will be removed in
1.16/1.17?

No. We are not putting a hard-date on the removal of Java 8
support at this time.


Will this mean that at some point we'll surprise you with
the removal of Java 8 support in the next release?

No. We will announce the removal ahead of time by /at least/ half
a year / 2+ releases (probably closer to a full year).


Is the deprecation already decided?

No. The responses in this thread are integral for deciding whether
a deprecation at this time makes sense.


If you are still using Java 8 at the moment, then we would
appreciate if you could tell us whether you already have a
time-frame for
when you intend to upgrade to Java 11. We'd also be interested in
anything that blocks your migration to Java 11.


Please raise concerns you have, and feel free to ask questions.



--
Gavin




flink sql lookup join中维表不可以是视图吗?

2021-12-01 Thread casel.chen
lookup join用的维表需要从两张mysql表做关联后得到,因此创建了一个视图。但发现flink sql不支持lookup join关联视图,会抛
Temporal Table Join requires primary key in versioned table, but no primary key 
can be found. 


请问这种情况要怎么解决?


CREATE VIEW query_mer_view (mer_cust_id, update_time) AS
SELECT a.mer_cust_id, k.update_time
FROM ka_mer_info k INNER JOIN adp_mer_user_info a on k.mer_cust_id = 
a.mer_cust_id
where k.mer_cust_id <> '';


SELECT
DATE_FORMAT(c.create_time, '-MM-dd') AS payment_date,
c.mer_cust_id,


c.trans_amt,
CASE c.trans_stat WHEN 'S' THEN c.trans_amt ELSE 0 END as succ_amt ,


1 as trans_cnt,
CASE c.trans_stat WHEN 'S' THEN 1 ELSE 0  END as succ_cnt ,
CASE c.trans_stat WHEN 'F' THEN 1 ELSE 0  END as fail_cnt


FROM charge_log as c
 LEFT JOIN query_mer_view FOR SYSTEM_TIME AS OF c.proc_time AS q
 ON c.mer_cust_id = q.mer_cust_id;

Flink checkpoint文件大小与对应内存大小映射关系

2021-12-01 Thread mayifan
Hi,All~!

麻烦大家一个问题,有大佬了解过checkpoint文件大小与实际内存对应的状态数据大小的映射关系吗?

比如Fs状态后端checkpoint后文件大小是1MB,对应的状态数据在内存中占用大概是多少呢?

感谢答复~!

Re: [DISCUSS] Deprecate Java 8 support

2021-12-01 Thread Gavin Lee
Thanks for sharing this info with us Chesnay.
We've been using Flink for 5 years,  and upgraded to 1.13.2 months ago. The
java version is still 8.
Currently we're testing with java 17 in our staging environment. There are
no special concerns.
Will update when tests complete.


On Tue, Nov 30, 2021 at 1:18 AM Chesnay Schepler  wrote:

> Hello,
>
> we recently had a discussion on the dev mailing list for deprecating
> support for Java 8 in 1.15, with a general consensus in favor of it.
>
> I now wanted to check in with you, our users, to see what you have got to
> say about that.
>
> Why are we interested in deprecating Java 8 support now (and in eventually
> removing it)?
>
> The main reason is that supporting the recently released Java 17 (and
> subsequent versions), while maintaining Java 8 support,
> will be more complicated than if Java 11 were the oldest release version.
> Essentially because Java 11/17 have both crossed the Java 9 chasm.
>
> We will still have to bite this bullet in any case (because Java 17 is out 
> *now
> *but we are *not *dropping Java 8 *now*), but we would still
> like to signal that users should upgrade to Java 11 so that we can
> *eventually* clean this up.
>
> Furthermore, it is currently hard to justify investing time into
> benchmarks/performance improvements that are specific to Java 11+, because
> they provide no benefit to Java 8.
> What does the deprecation mean exactly?
>
> It will primarily mean that a warning will be logged when you run Flink on
> Java 8.
> We *may* change the default Java version of the Docker images to Java 11
> (the java8 tags will remain),
> and we will put a larger emphasis on Flink's performance on Java 11.
> Does that mean that Java 8 support will be removed in 1.16/1.17?
>
> No. We are not putting a hard-date on the removal of Java 8 support at
> this time.
> Will this mean that at some point we'll surprise you with the removal of
> Java 8 support in the next release?
>
> No. We will announce the removal ahead of time by *at least* half a year
> / 2+ releases (probably closer to a full year).
> Is the deprecation already decided?
>
> No. The responses in this thread are integral for deciding whether a
> deprecation at this time makes sense.
>
>
> If you are still using Java 8 at the moment, then we would appreciate if
> you could tell us whether you already have a time-frame for
> when you intend to upgrade to Java 11. We'd also be interested in anything
> that blocks your migration to Java 11.
>
>
> Please raise concerns you have, and feel free to ask questions.
>


-- 
Gavin


Re: how to run streaming process after batch process is completed?

2021-12-01 Thread vtygoss
Hi Alexander,


This is my ideal data pipeline. 
- 1. Sqoop transfer bounded data from database to hive. And I think flink batch 
process is more efficient than streaming process, so i want to process this 
bounded data in batch mode and write result in HiveTable2. 
- 2. There ares some tools to transfer CDC / BINLOG to kafka, and to write 
incremental unbounded data in HiveTable1.  I want to process this unbounded 
data in streaming mode and update incremental result in HiveTable2. 


So this is the problem. The flink streaming sql application cannot be restored 
from  batch process application. e.g. SQL: insert into table_2 select count(1) 
from table_1. In batch mode, the result stored in table_2 is N. And i expect 
that the accumulator number starts from N, not 0 when streaming process started.


Thanks for your reply. 


Best Regard!


(sending again because I accidentally left out the user ml in the reply on the 
first try)...


在 2021年11月30日 21:42,Alexander Preuß 写道:


Hi Vtygoss, 

Can you explain a bit more about your ideal pipeline? Is the batch data bounded 
data or could you also process it in streaming execution mode? And is the 
streaming data derived from the batch data or do you just want to ensure that 
the batch has been finished before running the processing of the streaming data?

Best Regards,
Alexander

(sending again because I accidentally left out the user ml in the reply on the 
first try)


On Tue, Nov 30, 2021 at 12:38 PM vtygoss  wrote:

Hi, community!


By Flink, I want to unify batch process and streaming process in data 
production pipeline. Batch process is used to process inventory data, then 
streaming process is used to process incremental data. But I meet a problem, 
there is no  state in batch and the result is error if i run stream process 
directly. 


So how to run streaming process accurately  after batch process is completed?   
Is there any doc or demo to handle this scenario?


Thanks for your any reply or suggestion!


Best Regards!

Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-12-01 Thread Yingjie Cao
Hi Jiangang,

Great to hear that, welcome to work together to make the project better.

Best,
Yingjie

刘建刚  于2021年12月1日周三 下午3:27写道:

> Good work for flink's batch processing!
> Remote shuffle service can resolve the container lost problem and reduce
> the running time for batch jobs once failover. We have investigated the
> component a lot and welcome Flink's native solution. We will try it and
> help improve it.
>
> Thanks,
> Liu Jiangang
>
> Yingjie Cao  于2021年11月30日周二 下午9:33写道:
>
> > Hi dev & users,
> >
> > We are happy to announce the open source of remote shuffle project [1]
> for
> > Flink. The project is originated in Alibaba and the main motivation is to
> > improve batch data processing for both performance & stability and
> further
> > embrace cloud native. For more features about the project, please refer
> to
> > [1].
> >
> > Before going open source, the project has been used widely in production
> > and it behaves well on both stability and performance. We hope you enjoy
> > it. Collaborations and feedbacks are highly appreciated.
> >
> > Best,
> > Yingjie on behalf of all contributors
> >
> > [1] https://github.com/flink-extended/flink-remote-shuffle
> >
>


Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-12-01 Thread Yingjie Cao
Hi Jiangang,

Great to hear that, welcome to work together to make the project better.

Best,
Yingjie

刘建刚  于2021年12月1日周三 下午3:27写道:

> Good work for flink's batch processing!
> Remote shuffle service can resolve the container lost problem and reduce
> the running time for batch jobs once failover. We have investigated the
> component a lot and welcome Flink's native solution. We will try it and
> help improve it.
>
> Thanks,
> Liu Jiangang
>
> Yingjie Cao  于2021年11月30日周二 下午9:33写道:
>
> > Hi dev & users,
> >
> > We are happy to announce the open source of remote shuffle project [1]
> for
> > Flink. The project is originated in Alibaba and the main motivation is to
> > improve batch data processing for both performance & stability and
> further
> > embrace cloud native. For more features about the project, please refer
> to
> > [1].
> >
> > Before going open source, the project has been used widely in production
> > and it behaves well on both stability and performance. We hope you enjoy
> > it. Collaborations and feedbacks are highly appreciated.
> >
> > Best,
> > Yingjie on behalf of all contributors
> >
> > [1] https://github.com/flink-extended/flink-remote-shuffle
> >
>


Re:Re: flink sql group by后收集数据问题

2021-12-01 Thread casel.chen



我想要的是一个通用的收集ROW类型集合(ARRAY去重和不去重),不是只针对特定ROW
@DataTypeHint("ROW") 这样写没有问题@DataTypeHint("ROW") 
这样写会报错














在 2021-12-01 11:12:27,"Caizhi Weng"  写道:
>Hi!
>
>UDF 支持 ROW 类型,详见 [1] 中关于 ROW 的示例。
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/udfs/#%e8%87%aa%e5%8a%a8%e7%b1%bb%e5%9e%8b%e6%8e%a8%e5%af%bc
>
>casel.chen  于2021年12月1日周三 上午7:56写道:
>
>> 业务中使用flink sql group by操作后想收集所有的数据,如下示例:
>>
>>
>> kafka源表:
>> 班级 学号  姓名  年龄
>> 1 20001张三   15
>> 2 20011李四   16
>> 1 20002王五   16
>> 2 20012吴六   15
>>
>>
>> create table source_table (
>>class_no: INT,
>>student_no: INT,
>>name: STRING,
>>age: INT
>> ) with (
>>'connector' = 'kafka',
>>...
>> );
>>
>>
>> mongodb目标表:
>> 班级 学生信息
>> 1 [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no":
>> 20002, "name":"王五", "age": 16}]
>> 2 [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no":
>> 20012, "name":"吴六", "age": 15}]
>>
>>
>> create table sink_table (
>>   class_no INT,
>>   students: ARRAY>
>> ) with (
>>   'connector' = 'mongodb',
>>   ...
>> );
>>
>>
>> 查了下flink自带的系统函数,接近满足条件的只有collect函数。
>> insert into sink_table select class_no, collect(ROW(student_no, name, age)
>> from source_table group by class_no;
>>
>>
>> 但它返回的是Multiset类型,即Map> Integer>。如果key的类型是ROW,像我这种场景,直接写mongodb会抛错,因为它会自动强制将key的类型转成STRING。
>> 何况这里我只想收集Array[ROW],相当于只要Map中的keyset,即去重后的Array。
>>
>>
>> 1.
>> 如果要收集去重的Array[ROW],有什么办法可以做到吗?我曾尝试写UDF,但UDF不支持ROW类型,只支持具体的数据类型,有何建议或参考例子?
>> 2. 如果要收集不去重的Array[ROW],又该怎么写?
>> 3. 访问一个数据类型为Map的数据中key和value,分别要用什么flink sql语法?
>>
>>
>> 谢谢解答!
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>


Watermark behavior when connecting streams

2021-12-01 Thread Alexis Sarda-Espinosa
Hi everyone,

Based on what I know, a single operator with parallelism > 1 checks the 
watermarks from all its streams and uses the smallest one out of the non-idle 
streams. My first question is whether watermarks are forwarded as long as a 
different watermark strategy is not applied downstream? For example, will my 
stream keep its watermarks even after windowing + processing?

The second question is what happens with watermarks after connecting 2 streams, 
specifically these 2 possibilities:

- One stream was watermarks but the other one doesn't.
- Both streams have watermarks.

Regards,
Alexis.



RE: Windows and data loss.

2021-12-01 Thread Schwalbe Matthias
Hi John,

Sorry for the delay … I’m a little tight on spare time for user@flink currently.
If you are still interested we could pick up the discussion and continue.
However I’m don’t exactly understand what you want to achieve:

  1.  Would processing time windows be enough for you (and misplacement of 
events into the wrong window acceptable)?
  2.  Do you want to use event time windows, but cannot afford losing late 
events? (we can work out a scheme, that this would work)
  3.  How do you currently organize your input events in kafka?
 *   1 event per log row?
 *   Kafka-event timestamp extracted from/per the log row?
 *   You mentioned shuffling (random assignment) to kafka partition,

  i.Is this per log row, or is this 
per log file

 ii.Do you kafka-key by log file, 
or even by log application

 *   Do you select log files to be collected in file timestamp order
  1.  I assume your windows are keyed by application, or do you use another 
keyBy()?
  2.  What watermarking strategy did you configure?
 *   You mentioned that watermarks advance even if file-ingress is blocked
 *   Can you publish/share the 3 odd lines of code for your watermark 
strategy setup?

Just as said before, ignoring-late-events is a default strategy, that can be 
adjusted by means of a custom window trigger which trades off between latency, 
state size, correctness of the final results.

Thias

From: John Smith 
Sent: Freitag, 26. November 2021 17:17
To: Schwalbe Matthias 
Cc: Caizhi Weng ; user 
Subject: Re: Windows and data loss.

Or as an example we have a 5 minutes window and lateness of 5 minutes.

We have the following events in the logs
10:00:01 PM > Already pushed to Kafka
10:00:30 PM > Already pushed to Kafka
10:01:00 PM > Already pushed to Kafka
10:03:45 PM > Already pushed to Kafka
10:04:00 PM > Log agent crashed for 30 minutes not delivered to Kafla yet
10:05:10 PM > Pushed to Kafka cause I came from a log agent that isn't dead.

Flink window of 10:00:00
10:00:01 PM > Received
10:00:30 PM > Received
10:01:00 PM > Received
10:03:45 PM > Received
10:04:00 PM > Still nothing

Flink window of 10:00:00 5 lateness minutes are up.
10:00:01 PM > Counted
10:00:30 PM > Counted
10:01:00 PM > Counted
10:03:45 PM > Counted
10:04:00 PM > Still nothing

Flink window of 10:05:00 started
10:05:10 PM.> I'm new cause I came from a log agent that isn't dead.
10:04:00 PM > Still nothing

Flink window of 10:05:00 5 lateness minutes are up.
10:05:10 PM.> I have been counted, I'm happy!
10:04:00 PM > Still nothing

And so on...

Flink window of 10:30:00 started
10:04:00 PM > Hi guys, sorry I'm late 30 minutes, I ran into log agent 
problems. Sorry you are late, you missed the Flink bus.

On Fri, 26 Nov 2021 at 10:53, John Smith 
mailto:java.dev@gmail.com>> wrote:
Ok,

So processing time we get 100% accuracy because we don't care when the event 
comes, we just count and move along.
As for event time processing, what I meant to say is if for example if the log 
shipper is late at pushing events into Kafka, Flink will not notice this, the 
watermarks will keep watermarking. So given that, let's say we have a window of 
5 minutes and a lateness of 5 minutes, it means we will see counts on the 
"dashboard" every 10 minutes. But say the log shipper fails/falls behind for 30 
minutes or more, the Flink Kafka consumer will simply not see any events and it 
will continue chugging along, after 30 minutes a late event comes in at 2 
windows already too late, that event is discarded.

Or did I miss the point on the last part?


On Fri, 26 Nov 2021 at 09:38, Schwalbe Matthias 
mailto:matthias.schwa...@viseca.ch>> wrote:
Actually not, because processing-time does not matter at all.
Event-time timers are always compared to watermark-time progress.
If system happens to be compromised for (say) 4 hours, also watermarks won’t 
progress, hence the windows get not evicted and wait for watermarks to pick up 
from when the system crashed.

Your watermark strategy can decide how strict you handle time progress:

  *   Super strict: the watermark time indicates that there will be no events 
with an older timestamp
  *   Semi strict: you accept late events and give a time-range when this can 
happen (still processing time put aside)

 *   You need to configure acceptable lateness in your windowing operator
 *   Accepted lateness implies higher overall latency

  *   Custom strategy

 *   Use a combination of accepted lateness and a custom trigger in your 
windowing operator
 *   The trigger decide when and how often window results are emitted
 *   The following operator would the probably implement some 
idempotence/updating scheme for the window values
 *   This way you get immediate low latency results and allow for