Re: AsyncIO 用Redis做缓存

2019-08-06 文章 戴嘉诚
你好,
可以用lettuce做异步客户端,排除lettuce的netty依赖,用flink的netty,就可以了集成lettuce了

王佩 于2019年8月6日 周二22:11写道:

> 这种Join场景,用上缓存后,理论上应该更快,但为啥会变慢呢。
>
> 王佩  于2019年8月6日周二 下午10:09写道:
>
> > 需求: 事实表实时Join Kudu中的维度表,用来补全维度。
> >
> > 为加快查询速度,先从Kudu中查询数据,查询到数据后放入Redis缓存,下次查询先从Redis中取,取不到再从Kudu中查。
> >
> > 遇到的问题:
> > 1、不用Redis缓存,checkpoint很快,效率很高。
> > 2、用Redis缓存,用Jedis,但不用连接池,效率很低。
> > 3、用Redis缓存,用Redis连接池,效率更低。
> >
> > 请教下:
> > 1、从Kudu中取数据,不用缓存可以吗。
> > 2、在AsyncIO中,用lettuce异步客户端,和flink netty不兼容。
> > 3、在AsyncIO中,用Jedis连接池,flink checkpoint很慢的原因。
> > 3、像我这种场景: 流(实时表) Join Kudu中的维度表,怎么才能更好的提高性能。
> >
> > 烦请解答下,辛苦,感谢。
> >
> >
> >
>


Re: AsyncIO 用Redis做缓存

2019-08-06 文章 王佩
这种Join场景,用上缓存后,理论上应该更快,但为啥会变慢呢。

王佩  于2019年8月6日周二 下午10:09写道:

> 需求: 事实表实时Join Kudu中的维度表,用来补全维度。
>
> 为加快查询速度,先从Kudu中查询数据,查询到数据后放入Redis缓存,下次查询先从Redis中取,取不到再从Kudu中查。
>
> 遇到的问题:
> 1、不用Redis缓存,checkpoint很快,效率很高。
> 2、用Redis缓存,用Jedis,但不用连接池,效率很低。
> 3、用Redis缓存,用Redis连接池,效率更低。
>
> 请教下:
> 1、从Kudu中取数据,不用缓存可以吗。
> 2、在AsyncIO中,用lettuce异步客户端,和flink netty不兼容。
> 3、在AsyncIO中,用Jedis连接池,flink checkpoint很慢的原因。
> 3、像我这种场景: 流(实时表) Join Kudu中的维度表,怎么才能更好的提高性能。
>
> 烦请解答下,辛苦,感谢。
>
>
>


AsyncIO 用Redis做缓存

2019-08-06 文章 王佩
需求: 事实表实时Join Kudu中的维度表,用来补全维度。

为加快查询速度,先从Kudu中查询数据,查询到数据后放入Redis缓存,下次查询先从Redis中取,取不到再从Kudu中查。

遇到的问题:
1、不用Redis缓存,checkpoint很快,效率很高。
2、用Redis缓存,用Jedis,但不用连接池,效率很低。
3、用Redis缓存,用Redis连接池,效率更低。

请教下:
1、从Kudu中取数据,不用缓存可以吗。
2、在AsyncIO中,用lettuce异步客户端,和flink netty不兼容。
3、在AsyncIO中,用Jedis连接池,flink checkpoint很慢的原因。
3、像我这种场景: 流(实时表) Join Kudu中的维度表,怎么才能更好的提高性能。

烦请解答下,辛苦,感谢。


Re: submit jobGraph error on server side

2019-08-06 文章 Zili Chen
问题是 Ask timed out on [Actor[akka://flink/user/dispatcher#-273192824]] after
[1 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.
messages.LocalFencedMessage".

也就是 submit job 的时候在请求 Dispatcher 的时候 akka ask timeout
了,可以检查一下配置的地址和端口是否正确,或者贴出你的相关配置。

Best,
tison.


王智  于2019年8月6日周二 下午7:13写道:

> 向session cluster 提交job 出错,麻烦各位老师帮忙看下,给点排查提示 THX~
>
>
>
>
> 环境:
>
> blink 1.8.0
>
> 用docker 方式启动的flink session cluster,flink 集群独立,我从集群外的一个docker
> 节点提交job(该节点的flink-conf.yaml 配置与flink 集群内的配置一致)
>
>
>
>
> --
>
>
> 报错信息:
>
> 
>
> The program finished with the following exception:
>
>
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error:
> org.apache.flink.client.program.ProgramInvocationException: Could not
> retrieve the execution result. (JobID: 82
>
> 3a336683f6476b3e7ee2780c33395b)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
>
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>
> at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>
> Caused by: java.lang.RuntimeException:
> org.apache.flink.client.program.ProgramInvocationException: Could not
> retrieve the execution result. (JobID: 823a336683f6476b3e7ee2780c33395b)
>
> at
> com.xx.data.platform.pandora.flink.table.BatchSqlRunner.run(BatchSqlRunner.java:176)
>
> at
> com.xx.data.platform.pandora.flink.EntryPoint.main(EntryPoint.java:78)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at
> java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>
> ... 9 more
>
> Caused by: org.apache.flink.client.program.ProgramInvocationException:
> Could not retrieve the execution result. (JobID:
> 823a336683f6476b3e7ee2780c33395b)
>
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261)
>
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
>
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471)
>
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
>
> at
> com.xx.data.platform.pandora.flink.table.BatchSqlRunner.run(BatchSqlRunner.java:174)
>
> ... 15 more
>
>
>
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
>
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:388)
>
> at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>
> at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:207)
>
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>
> at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
>
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
>
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> [Internal server 

submit jobGraph error on server side

2019-08-06 文章 王智
向session cluster 提交job 出错,麻烦各位老师帮忙看下,给点排查提示 THX~




环境:

blink 1.8.0

用docker 方式启动的flink session cluster,flink 集群独立,我从集群外的一个docker 
节点提交job(该节点的flink-conf.yaml 配置与flink 集群内的配置一致)




--


报错信息:



The program finished with the following exception:




org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: org.apache.flink.client.program.ProgramInvocationException: 
Could not retrieve the execution result. (JobID: 82

3a336683f6476b3e7ee2780c33395b)

at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)

at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)

at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)

at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)

at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)

at 
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)

at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)

at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)

at 
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)

at 
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)

Caused by: java.lang.RuntimeException: 
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve 
the execution result. (JobID: 823a336683f6476b3e7ee2780c33395b)

at 
com.xx.data.platform.pandora.flink.table.BatchSqlRunner.run(BatchSqlRunner.java:176)

at 
com.xx.data.platform.pandora.flink.EntryPoint.main(EntryPoint.java:78)

at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)

... 9 more

Caused by: org.apache.flink.client.program.ProgramInvocationException: Could 
not retrieve the execution result. (JobID: 823a336683f6476b3e7ee2780c33395b)

at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261)

at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)

at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471)

at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)

at 
com.xx.data.platform.pandora.flink.table.BatchSqlRunner.run(BatchSqlRunner.java:174)

... 15 more



Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.

at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:388)

at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)

at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)

at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)

at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:207)

at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)

at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)

at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)

at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)

at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal 
server error., 

任务异常捕获疑问

2019-08-06 文章 zq wang
各位大佬好,
请问是否可以像下面这样捕获job异常, 如果不行应当怎么处理,或者有没有更好的方式?
谢谢...

try {
env.execute("jobName");
} catch (Throwable e) {
*//log..报警*
}


Re: Re:Re: Re: Flink RocksDBStateBackend 问题

2019-08-06 文章 Yun Tang

  1.  
你对增量checkpoint的理解以及taskmanager和RocksDB之间的关系理解不太对。RocksDBKeyedStateBackend使用RocksDB存储state数据,可以理解成其是taskmanager的一部分,实际上是单机的概念。增量checkpoint的时候,RocksDB会在同步阶段将其所有数据刷写到磁盘上,Flink框架选择之前没有上传的sst文件,异步上传到HDFS。如果没有开启local
 recovery,那么新启动的taskmanager会从hdfs上下载全量的数据文件进行恢复。
  2.  作业cancel了,task都会cancel,RocksDBKeyedStateBackend自然也会退出。

祝好
唐云

From: lvwenyuan 
Sent: Tuesday, August 6, 2019 14:53
To: user-zh@flink.apache.org 
Subject: Re:Re: Re: Flink RocksDBStateBackend 问题

唐老师您好:
 我这里指的是checkpoint时存储数据的file system,这里我用的是HDFS。
按照老师的说法,我可不可以这样理解(在Flink on yarn 以及 使用 RocksDBStateBackend 的场景下):
   1.做增量checkpoint的时候,taskmanager默认异步的将数据写入rocksdb和hdfs中(数据是相同的)。
   
当taskmanager异常退出时,会启动另一个taskmanager去做task,那么新起的taskmanager是否会去hdfs上同步数据。
   那么这个rocksdb的意义是在哪里?数据反正HDFS上都有,而且HDFS本身也多副本。
   2.在做savepoint的时候,一般会将数据保存在HDFS上,那么这个是,我的命令是  flink cancel -s  
。做完savepoint就退出,那么这个时候rocksdb还需要去写数据吗?因为做完savepoint,整个任务就结束了。


  望解答,谢谢老师!







在 2019-08-06 13:44:18,"Yun Tang"  写道:
>@lvwenyuan
>首先需要明确的一点是,你这里的“FileSystem”指的究竟是checkpoint时存储数据的file 
>system,还是FsStateBackend,建议下次提问前可以把需要咨询的内容表述清楚一些。
>
>  *   如果指的是存储checkpoint数据的远程file system,在incremental 
> checkpoint场景下,这些数据与RocksDB的创建checkpoint时刷写到本地的sst文件和meta文件是二进制相同的,只是文件名会重命名。如果是savepoint或者全量checkpoint场景下,这些数据是RocksDB中逐个有效entry的序列化内容。
>  *   
> 如果指的是FsStateBackend,对于Flink而言存储的数据内容在逻辑上肯定都是一样的,否则就不符合语义了。但是二者在数据存储格式上是有区别的。FsStateBackend所创建的HeapKeyedStateBackend的数据内容都是存储在Java
>  heap内的,基本数据格式是StateTable[1]和其中存储数据的StateMap[2]。而RocksDB存储的数据主要是RocksDB 
> native内存中的writer buffer(memtable),block 
> cache,index[3]和已经刷写到磁盘上默认采用snappy压缩的不可变sst文件构成。
>
>[1] 
>https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
>[2] 
>https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateMap.java
>[3] https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB
>
>
>祝好
>唐云
>
>
>From: 戴嘉诚 
>Sent: Tuesday, August 6, 2019 12:01
>To: user-zh@flink.apache.org 
>Subject: 答复: Re: Flink RocksDBStateBackend 问题
>
>
> 不是,文档上有说,filesystem是会把正在运行的数据存储在tm的内存中,然后触发checkpoint后,才会写入文件系统上,而rocksdb是直接把运行中的数据写到了rocksdb上,看样子是不占用运行中的tm的内存。
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/state_backends.html#the-fsstatebackend
>
>`The FsStateBackend holds in-flight data in the TaskManager’s memory. Upon 
>checkpointing, it writes state snapshots into files in the configured file 
>system and directory. Minimal metadata is stored in the JobManager’s memory 
>(or, in high-availability mode, in the metadata checkpoint).`
>
>发件人: athlon...@gmail.com
>发送时间: 2019年8月6日 11:53
>收件人: user-zh
>主题: Re: Re: Flink RocksDBStateBackend 问题
>
>你说的是memsystem的状态数据存在jm内存中的filesystem是存到文件系统上的
>
>
>
>athlon...@gmail.com
>发件人: 戴嘉诚
>发送时间: 2019-08-06 11:42
>收件人: user-zh
>主题: Re: Flink RocksDBStateBackend 问题
>FileSystem 我记得是存储的大小是不能超过tm的内存还是jm的内存,而rocksdb上存储的数据是可以无限的,不过相对来说,
>FileSystem的吞吐就会比rocksdb会高
>lvwenyuan  于2019年8月6日周二 上午11:39写道:
>> 请教各位:
>>RocksDBStateBackend
>> 中,rocksdb上存储的内如和FileSystem上存储的数据内容是一样的?如果不一样,那么分别是什么呢?感谢回答
>>
>>
>>
>>
>


答复: jobmanager 日志异常

2019-08-06 文章 戴嘉诚
你好,
谢谢!已经找到原因了 

发件人: Biao Liu
发送时间: 2019年8月6日 13:55
收件人: user-zh
主题: Re: jobmanager 日志异常

你好,

> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED
> SIGNAL 15: SIGTERM. Shutting down as requested.

这是收到了 signal 15 了 [1],Wong 说得对,搜一下 yarn node manager 或者 yarn resource
manager 的 log

1. https://access.redhat.com/solutions/737033

Thanks,
Biao /'bɪ.aʊ/



On Tue, Aug 6, 2019 at 12:30 PM Wong Victor 
wrote:

> Hi,
>   可以查看一下jobmanager所在节点的yarn log,搜索一下对应的container为什么被kill;
>
> Regards
>
> On 2019/8/6, 11:40 AM, "戴嘉诚"  wrote:
>
> 大家好:
>
>
>
> 我的flink是部署在yarn上左session,今天早上jobmanager自动退出了,然后yarn把他重新拉起了,导致里面跑的job重新启动了,但是我查看日志,看到jobmanager的日志没有任何异常,同时jobmanager也没有长时间的full
> gc和频繁的gc,以下是jobmanager的日志:
> 就是在06:44分的是偶,日志上标记了收收到停止请求,然后jobmanager直接停止了...请问是由于什么原因导致的呢?
>
> 2019-08-06 06:43:58,891 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7843 for job e49624208fe771c4c9527799fd46f2a3 (5645215
> bytes in
> > 801 ms).
> > 2019-08-06 06:43:59,336 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7852 @ 1565045039321 for job
> a9a7464ead55474bea6f42ed8e5de60f.
> > 2019-08-06 06:44:00,971 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7852 @ 1565045040957 for job
> 79788b218e684cb31c1ca0fcc641e89f.
> > 2019-08-06 06:44:01,357 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7852 for job a9a7464ead55474bea6f42ed8e5de60f (25870658
> bytes in
> > 1806 ms).
> > 2019-08-06 06:44:02,887 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7852 for job 79788b218e684cb31c1ca0fcc641e89f (29798945
> bytes in
> > 1849 ms).
> > 2019-08-06 06:44:05,101 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7852 @ 1565045045092 for job
> 03f3a0bd53c21f90f70ea01916dc9f78.
> > 2019-08-06 06:44:06,547 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7844 @ 1565045046522 for job
> 486a1949d75863f823013d87b509d228.
> > 2019-08-06 06:44:07,311 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7844 for job 486a1949d75863f823013d87b509d228 (62458942
> bytes in
> > 736 ms).
> > 2019-08-06 06:44:07,506 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7852 for job 03f3a0bd53c21f90f70ea01916dc9f78 (105565032
> bytes
> > in 2366 ms).
> > 2019-08-06 06:44:08,087 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7853 @ 1565045048055 for job
> 32783d371464265ef536454055ae6182.
> > 2019-08-06 06:44:09,626 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Checkpoint
> > 7050 of job 4b542195824ff7b7cdf749543fd368cb expired before
> completing.
> > 2019-08-06 06:44:09,647 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7051 @ 1565045049626 for job
> 4b542195824ff7b7cdf749543fd368cb.
> > 2019-08-06 06:44:12,006 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7853 for job 32783d371464265ef536454055ae6182 (299599482
> bytes
> > in 3912 ms).
> > 2019-08-06 06:44:12,972 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7853 @ 1565045052962 for job
> 16db5afe9a8cd7c6278030d5dec4c80c.
> > 2019-08-06 06:44:13,109 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7853 @ 1565045053080 for job
> 9c1394a2d2ff47c7852eff9f1f932535.
> > 2019-08-06 06:44:16,779 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7853 for job 16db5afe9a8cd7c6278030d5dec4c80c (152643149
> bytes
> > in 3666 ms).
> > 2019-08-06 06:44:18,598 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7828 for job 8df2b47f2a4c1ba0f7019ee5989f6e71 (837558245
> bytes
> > in 23472 ms).
> > 2019-08-06 06:44:19,193 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7853 for job 9c1394a2d2ff47c7852eff9f1f932535 (594628825
> bytes
> > in 6067 ms).
> > 2019-08-06 06:44:19,238 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 5855 for job 108ce7f6f5f3e76b12fad9dbdbc8feba (45917615
> bytes in
> > 61819 ms).
> > 2019-08-06 06:44:19,248 INFO
> >  

Re:Re: Re: Flink RocksDBStateBackend 问题

2019-08-06 文章 lvwenyuan
唐老师您好:
 我这里指的是checkpoint时存储数据的file system,这里我用的是HDFS。
按照老师的说法,我可不可以这样理解(在Flink on yarn 以及 使用 RocksDBStateBackend 的场景下): 
   1.做增量checkpoint的时候,taskmanager默认异步的将数据写入rocksdb和hdfs中(数据是相同的)。
   
当taskmanager异常退出时,会启动另一个taskmanager去做task,那么新起的taskmanager是否会去hdfs上同步数据。
   那么这个rocksdb的意义是在哪里?数据反正HDFS上都有,而且HDFS本身也多副本。
   2.在做savepoint的时候,一般会将数据保存在HDFS上,那么这个是,我的命令是  flink cancel -s  
。做完savepoint就退出,那么这个时候rocksdb还需要去写数据吗?因为做完savepoint,整个任务就结束了。


  望解答,谢谢老师!







在 2019-08-06 13:44:18,"Yun Tang"  写道:
>@lvwenyuan
>首先需要明确的一点是,你这里的“FileSystem”指的究竟是checkpoint时存储数据的file 
>system,还是FsStateBackend,建议下次提问前可以把需要咨询的内容表述清楚一些。
>
>  *   如果指的是存储checkpoint数据的远程file system,在incremental 
> checkpoint场景下,这些数据与RocksDB的创建checkpoint时刷写到本地的sst文件和meta文件是二进制相同的,只是文件名会重命名。如果是savepoint或者全量checkpoint场景下,这些数据是RocksDB中逐个有效entry的序列化内容。
>  *   
> 如果指的是FsStateBackend,对于Flink而言存储的数据内容在逻辑上肯定都是一样的,否则就不符合语义了。但是二者在数据存储格式上是有区别的。FsStateBackend所创建的HeapKeyedStateBackend的数据内容都是存储在Java
>  heap内的,基本数据格式是StateTable[1]和其中存储数据的StateMap[2]。而RocksDB存储的数据主要是RocksDB 
> native内存中的writer buffer(memtable),block 
> cache,index[3]和已经刷写到磁盘上默认采用snappy压缩的不可变sst文件构成。
>
>[1] 
>https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
>[2] 
>https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateMap.java
>[3] https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB
>
>
>祝好
>唐云
>
>
>From: 戴嘉诚 
>Sent: Tuesday, August 6, 2019 12:01
>To: user-zh@flink.apache.org 
>Subject: 答复: Re: Flink RocksDBStateBackend 问题
>
>
> 不是,文档上有说,filesystem是会把正在运行的数据存储在tm的内存中,然后触发checkpoint后,才会写入文件系统上,而rocksdb是直接把运行中的数据写到了rocksdb上,看样子是不占用运行中的tm的内存。
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/state_backends.html#the-fsstatebackend
>
>`The FsStateBackend holds in-flight data in the TaskManager’s memory. Upon 
>checkpointing, it writes state snapshots into files in the configured file 
>system and directory. Minimal metadata is stored in the JobManager’s memory 
>(or, in high-availability mode, in the metadata checkpoint).`
>
>发件人: athlon...@gmail.com
>发送时间: 2019年8月6日 11:53
>收件人: user-zh
>主题: Re: Re: Flink RocksDBStateBackend 问题
>
>你说的是memsystem的状态数据存在jm内存中的filesystem是存到文件系统上的
>
>
>
>athlon...@gmail.com
>发件人: 戴嘉诚
>发送时间: 2019-08-06 11:42
>收件人: user-zh
>主题: Re: Flink RocksDBStateBackend 问题
>FileSystem 我记得是存储的大小是不能超过tm的内存还是jm的内存,而rocksdb上存储的数据是可以无限的,不过相对来说,
>FileSystem的吞吐就会比rocksdb会高
>lvwenyuan  于2019年8月6日周二 上午11:39写道:
>> 请教各位:
>>RocksDBStateBackend
>> 中,rocksdb上存储的内如和FileSystem上存储的数据内容是一样的?如果不一样,那么分别是什么呢?感谢回答
>>
>>
>>
>>
>


Flink sql join问题

2019-08-06 文章 huang
Hi all,


请问用Flink sql做双流join。如果希望两个流都只保存每个key的最新的数据,这样相当于每次join都只输出最新的一条记录。请问这种场景sql支持吗


thanks