flink????????????????????

2021-12-01 文章 ????


Re:Re: 关于streamFileSink在checkpoint下生成文件问题

2021-12-01 文章 黄志高
|




32684
|
COMPLETED
| 8/8 | 13:52:36 | 13:52:38 | 2s | 126 KB | 0 B |
| | 32683 |
COMPLETED
| 8/8 | 13:42:36 | 13:42:39 | 2s | 126 KB | 0 B |
| | 32682 |
COMPLETED
| 8/8 | 13:32:36 | 13:32:39 | 2s | 126 KB | 0 B |
| | 32681 |
COMPLETED
| 8/8 | 13:22:36 | 13:22:39 | 2s | 125 KB | 0 B |
| | 32680 |
COMPLETED
| 8/8 | 13:12:36 | 13:12:39 | 2s | 125 KB | 0 B |
| | 32679 |
COMPLETED
| 8/8 | 13:02:36 | 13:02:41 | 4s | 214 KB | 0 B |
上图是checkpoint


这个是在11月30号0时段生成的文件
2021-11-30 00:00:011080827 athena_other-0-217891.gz
2021-11-30 00:02:424309209 athena_other-0-217892.gz
2021-11-30 00:12:403902474 athena_other-0-217893.gz
2021-11-30 00:22:403886322 athena_other-0-217894.gz
2021-11-30 00:32:403988037 athena_other-0-217895.gz
2021-11-30 00:42:403892343 athena_other-0-217896.gz
2021-11-30 00:52:392972183 athena_other-0-217897.gz
2021-11-30 00:00:011125774 athena_other-1-219679.gz
2021-11-30 00:02:424338748 athena_other-1-219680.gz
2021-11-30 00:12:404204571 athena_other-1-219681.gz
2021-11-30 00:22:403852791 athena_other-1-219682.gz
2021-11-30 00:32:404025214 athena_other-1-219683.gz
2021-11-30 00:42:404205107 athena_other-1-219684.gz
2021-11-30 00:52:392922192 athena_other-1-219685.gz
2021-11-30 00:00:011103734 athena_other-2-220084.gz


这个是1点生成的文件
2021-11-30 01:00:011228793 athena_other-0-217951.gz
2021-11-30 01:02:424243566 athena_other-0-217952.gz
2021-11-30 01:12:404106305 athena_other-0-217953.gz
2021-11-30 01:22:404456214 athena_other-0-217954.gz
2021-11-30 01:32:414303156 athena_other-0-217955.gz
2021-11-30 01:42:404688872 athena_other-0-217956.gz
2021-11-30 01:52:403251910 athena_other-0-217957.gz
2021-11-30 01:00:011163354 athena_other-1-219736.gz
2021-11-30 01:02:424405233 athena_other-1-219737.gz
2021-11-30 01:12:404094502 athena_other-1-219738.gz
2021-11-30 01:22:404395071 athena_other-1-219739.gz
2021-11-30 01:32:404205169 athena_other-1-219740.gz
2021-11-30 01:42:404432610 athena_other-1-219741.gz
2021-11-30 01:52:403224111 athena_other-1-219742.gz
2021-11-30 01:00:011163964 athena_other-2-220137.gz




之前的截图无法发送,我把文件贴出来,打扰了







在 2021-12-02 13:52:43,"黄志高"  写道:




Hi,我把文件放到下面的,文件在checkpoint可见我是理解的,但是文件的生成时间应该是在checkpoint以后是正常的,但是我却在每个整点时段看见数据文件,如下图所示,按理说文件的生成都是在checkpoint之后的,也就是2分,12,22,32,42,52分后,而每个00分都会生成一个数据文件,不理解这个文件怎么生成的,内部的滚动策略是OnCheckpointRollingPolicy














在 2021-12-02 11:37:31,"Caizhi Weng"  写道:
>Hi!
>
>邮件里看不到图片和附件,建议使用外部图床。
>
>partFile 文件是不是以英文句点开头的?这是因为 streamingFileSink 写文件的时候还没做 checkpoint,为了保证
>exactly once,这些临时写下的 .partFile 文件都是不可见的,需要等 checkpoint 之后才会重命名成可见的文件。
>
>黄志高  于2021年12月1日周三 下午9:53写道:
>
>> hi,各位大佬,咨询个问题
>>
>>  
>> 我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整点时刻生成一个文件,我查阅下源码,没有找到整点触发滚动生成文件的逻辑,有大佬可以帮忙分析一下这个整点时刻生成的文件是怎么来的吗,它属于哪个周期的,附件中是我flink任务的checkpoint时间点,和2021年11月30日在1点和2点生成的文件截图,在1点和2点的00分都生成了一个文件,望大佬帮忙看看
>>
>>
>>
>>





 

ERROR org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - Authentication failed

2021-12-01 文章 summer
在我CDH6.3.2集成Flink1.13.3的时候,在执行flink-sql的时候,在日志中会出现这个报错:


ERROR StatusLogger No Log4j 2 configuration file found. Using default 
configuration (logging only errors to the console), or user 
programmatically provided configurations. Set system property 
'log4j2.debug' to show Log4j 2 internal initialization logging. See 
https://logging.apache.org/log4j/2.x/manual/configuration.html for 
instructions on how to configure Log4j 2
10:45:50.236 [main-EventThread] ERROR 
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - 
Authentication failed
JobManager Web Interface: http://lo-t-work3:8081
The Flink Yarn cluster has failed.
10:56:08.877 [main-EventThread] ERROR 
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - 
Authentication failed


请问这是什么原因造成的?

ERROR org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - Authentication failed

2021-12-01 文章 summer
在我CDH6.3.2集成Flink1.13.3的时候,在执行flink-sql的时候,在日志中会出现这个报错:


ERROR StatusLogger No Log4j 2 configuration file found. Using default 
configuration (logging only errors to the console), or user 
programmatically provided configurations. Set system property 
'log4j2.debug' to show Log4j 2 internal initialization logging. See 
https://logging.apache.org/log4j/2.x/manual/configuration.html for 
instructions on how to configure Log4j 2
10:45:50.236 [main-EventThread] ERROR 
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - 
Authentication failed
JobManager Web Interface: http://lo-t-work3:8081
The Flink Yarn cluster has failed.
10:56:08.877 [main-EventThread] ERROR 
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - 
Authentication failed


请问这是什么原因造成的?

Re:Re: flink remote shuffle example运行出错

2021-12-01 文章 casel.chen
sorry, 应该是我本地切换FLINK_HOME没生效导致的,重新开启一个terminal新环境变量生效后重新操作就正常了,打扰各位了

















在 2021-12-02 10:35:53,"weijie guo"  写道:
>你好,你是在flink standalone模式下提交的作业吗,另外用的flink是官网download的Apache Flink 1.14.0
>for Scala 2.12
>
>吗
>casel.chen  于2021年12月2日周四 上午8:12写道:
>
>> 按照 https://github.com/flink-extended/flink-remote-shuffle 上的指南试着运行flink
>> remote shuffle服务跑一个batch作业,结果报错如下。我本地使用的是scala 2.12
>> 因此编译打包flink-remote-shuffle的时候使用的命令是:mvn clean install -DskipTests
>> -Dscala.binary.version=2.12
>>
>> 报的这个找不到的类是flink-streaming-java_2.12-1.14.0.jar下的,我将该jar包放在$FLINK_HOME/lib目录下也没有作用。本地flink版本是1.14.0
>>
>>
>>
>> java.lang.NoClassDefFoundError:
>> org/apache/flink/streaming/api/graph/GlobalStreamExchangeMode
>>
>> at
>> com.alibaba.flink.shuffle.examples.BatchJobDemo.main(BatchJobDemo.java:51)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:498)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>>
>> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>>
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>>
>> at
>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>>
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>>
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode
>>
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>
>> at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>>
>> at
>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
>>
>> at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>
>> ... 14 more


Re: 关于streamFileSink在checkpoint下生成文件问题

2021-12-01 文章 Caizhi Weng
Hi!

邮件里看不到图片和附件,建议使用外部图床。

partFile 文件是不是以英文句点开头的?这是因为 streamingFileSink 写文件的时候还没做 checkpoint,为了保证
exactly once,这些临时写下的 .partFile 文件都是不可见的,需要等 checkpoint 之后才会重命名成可见的文件。

黄志高  于2021年12月1日周三 下午9:53写道:

> hi,各位大佬,咨询个问题
>
>  
> 我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整点时刻生成一个文件,我查阅下源码,没有找到整点触发滚动生成文件的逻辑,有大佬可以帮忙分析一下这个整点时刻生成的文件是怎么来的吗,它属于哪个周期的,附件中是我flink任务的checkpoint时间点,和2021年11月30日在1点和2点生成的文件截图,在1点和2点的00分都生成了一个文件,望大佬帮忙看看
>
>
>
>


Re: 谁能解释一下 GlobalStreamExchangeMode 这几种交换模式的不同和使用场景吗?

2021-12-01 文章 Yingjie Cao
这个是可以直接控制内部连边的方式,可以参考一下这个的Java doc。不过这个是一个内部接口,还是建议使用
env.setRuntimeMode(RuntimeExecutionMode.BATCH),这个可以参考一下这个文档:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/execution_mode/
。

public enum GlobalStreamExchangeMode {
/** Set all job edges to be {@link ResultPartitionType#BLOCKING}. */
ALL_EDGES_BLOCKING,

/**
 * Set job edges with {@link ForwardPartitioner} to be {@link
 * ResultPartitionType#PIPELINED_BOUNDED} and other edges to be {@link
 * ResultPartitionType#BLOCKING}.
 */
FORWARD_EDGES_PIPELINED,

/**
 * Set job edges with {@link ForwardPartitioner} or {@link
RescalePartitioner} to be {@link
 * ResultPartitionType#PIPELINED_BOUNDED} and other edges to be {@link
 * ResultPartitionType#BLOCKING}.
 */
POINTWISE_EDGES_PIPELINED,

/** Set all job edges {@link ResultPartitionType#PIPELINED_BOUNDED}. */
ALL_EDGES_PIPELINED,

/** Set all job edges {@link ResultPartitionType#PIPELINED_APPROXIMATE}. */
ALL_EDGES_PIPELINED_APPROXIMATE
}


casel.chen  于2021年12月2日周四 上午8:26写道:

> GlobalStreamExchangeMode 这几种交换模式的不同和使用场景是什么?哪些适合流式作业,哪些适合批式作业?
> Flink Remote Shuffle Service的推出是不是意味着可以在生产环境使用Flink处理批式作业?谢谢!
>
>
> package org.apache.flink.streaming.api.graph;
>
>
>
>
> import org.apache.flink.annotation.Internal;
>
>
>
>
> @Internal
>
> public enum GlobalStreamExchangeMode {
>
> ALL_EDGES_BLOCKING,
>
> FORWARD_EDGES_PIPELINED,
>
> POINTWISE_EDGES_PIPELINED,
>
> ALL_EDGES_PIPELINED,
>
> ALL_EDGES_PIPELINED_APPROXIMATE;
>
>
>
>
> private GlobalStreamExchangeMode() {
>
> }
>
> }
>
>
>


Re: flink remote shuffle example运行出错

2021-12-01 文章 Yingjie Cao
看起来flink-streaming-java应该不在class
path下面,如果是官网下的flink,直接在FLINK_HOME下执行./bin/flink
run提交job应该就不会出错。另外这个错误我理解本身和remote shuffle应该关系不大,不用remote
shuffle,应该也会抛。建议是从官网下载一个Flink版本再试一下。


Re: flink remote shuffle example运行出错

2021-12-01 文章 weijie guo
你好,你是在flink standalone模式下提交的作业吗,另外用的flink是官网download的Apache Flink 1.14.0
for Scala 2.12

吗
casel.chen  于2021年12月2日周四 上午8:12写道:

> 按照 https://github.com/flink-extended/flink-remote-shuffle 上的指南试着运行flink
> remote shuffle服务跑一个batch作业,结果报错如下。我本地使用的是scala 2.12
> 因此编译打包flink-remote-shuffle的时候使用的命令是:mvn clean install -DskipTests
> -Dscala.binary.version=2.12
>
> 报的这个找不到的类是flink-streaming-java_2.12-1.14.0.jar下的,我将该jar包放在$FLINK_HOME/lib目录下也没有作用。本地flink版本是1.14.0
>
>
>
> java.lang.NoClassDefFoundError:
> org/apache/flink/streaming/api/graph/GlobalStreamExchangeMode
>
> at
> com.alibaba.flink.shuffle.examples.BatchJobDemo.main(BatchJobDemo.java:51)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>
> at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
>
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>
> ... 14 more


Re: flink sql collect函数使用问题

2021-12-01 文章 cyril cui
af里acc为个list,merge的时候合并,输出的时候 list拼成string即可

casel.chen  于2021年12月2日周四 上午9:46写道:

> 使用场景如下,将kafka源表通过flink sql处理成mongodb汇表存入。按照班级进行group
> by,输出对应班级所有的学生数据集合。请问用flink sql自带的collect函数能实现吗?如果能的话要怎么写sql?
> 如果不能的话要怎么写UDAF,有例子参考吗?谢谢!
>
> kafka源表:
> 班级 学号  姓名  年龄
> 1 20001张三   15
> 2 20011李四   16
> 1 20002王五   16
> 2 20012吴六   15
>
> create table source_table (
>class_no: INT,
>student_no: INT,
>name: STRING,
>age: INT
> ) with (
>'connector' = 'kafka',
>...
> );
>
>
>
> 通过flink sql处理输出 ==>
>
>
> mongodb目标表:
> 班级 学生信息
> 1 [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no":
> 20002, "name":"王五", "age": 16}]
> 2 [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no":
> 20012, "name":"吴六", "age": 15}]
>
> create table sink_table (
>   class_no INT,
>   students: ARRAY>
> ) with (
>   'connector' = 'mongodb',
>   ...
> );
>
>


flink sql collect函数使用问题

2021-12-01 文章 casel.chen
使用场景如下,将kafka源表通过flink sql处理成mongodb汇表存入。按照班级进行group 
by,输出对应班级所有的学生数据集合。请问用flink sql自带的collect函数能实现吗?如果能的话要怎么写sql? 
如果不能的话要怎么写UDAF,有例子参考吗?谢谢!

kafka源表:
班级 学号  姓名  年龄
1 20001张三   15
2 20011李四   16
1 20002王五   16
2 20012吴六   15

create table source_table (
   class_no: INT,
   student_no: INT,
   name: STRING,
   age: INT
) with (
   'connector' = 'kafka',
   ...
);



通过flink sql处理输出 ==>


mongodb目标表:
班级 学生信息
1 [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no": 20002, 
"name":"王五", "age": 16}]
2 [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no": 20012, 
"name":"吴六", "age": 15}]

create table sink_table (
  class_no INT,
  students: ARRAY>
) with (
  'connector' = 'mongodb',
  ...
);



????

2021-12-01 文章 zdj


谁能解释一下 GlobalStreamExchangeMode 这几种交换模式的不同和使用场景吗?

2021-12-01 文章 casel.chen
GlobalStreamExchangeMode 这几种交换模式的不同和使用场景是什么?哪些适合流式作业,哪些适合批式作业?
Flink Remote Shuffle Service的推出是不是意味着可以在生产环境使用Flink处理批式作业?谢谢!


package org.apache.flink.streaming.api.graph;




import org.apache.flink.annotation.Internal;




@Internal

public enum GlobalStreamExchangeMode {

ALL_EDGES_BLOCKING,

FORWARD_EDGES_PIPELINED,

POINTWISE_EDGES_PIPELINED,

ALL_EDGES_PIPELINED,

ALL_EDGES_PIPELINED_APPROXIMATE;




private GlobalStreamExchangeMode() {

}

}




flink remote shuffle example运行出错

2021-12-01 文章 casel.chen
按照 https://github.com/flink-extended/flink-remote-shuffle 上的指南试着运行flink remote 
shuffle服务跑一个batch作业,结果报错如下。我本地使用的是scala 2.12 
因此编译打包flink-remote-shuffle的时候使用的命令是:mvn clean install -DskipTests 
-Dscala.binary.version=2.12
报的这个找不到的类是flink-streaming-java_2.12-1.14.0.jar下的,我将该jar包放在$FLINK_HOME/lib目录下也没有作用。本地flink版本是1.14.0



java.lang.NoClassDefFoundError: 
org/apache/flink/streaming/api/graph/GlobalStreamExchangeMode

at com.alibaba.flink.shuffle.examples.BatchJobDemo.main(BatchJobDemo.java:51)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)

at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode

at java.net.URLClassLoader.findClass(URLClassLoader.java:382)

at java.lang.ClassLoader.loadClass(ClassLoader.java:418)

at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)

at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)

at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)

at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

... 14 more

Re:Re: flink访问多个oss bucket问题

2021-12-01 文章 casel.chen









fs.oss.credentials.provider可以指定两个不同的provider吗?
如何区分是写数据的provider,还是做checkpoint/savepoint用的provider呢?










在 2021-12-01 10:58:18,"Caizhi Weng"  写道:
>Hi!
>
>如果只是 bucket 不同的话,通过在 with 参数里指定 path 即可。
>
>如果连 ak id 和 secret
>都不同,可以考虑实现自己的 com.aliyun.oss.common.auth.CredentialsProvider 接口,并在 flink
>conf 中指定 fs.oss.credentials.provider 为对应的实现类。
>
>casel.chen  于2021年12月1日周三 上午8:14写道:
>
>> flink平台作业写数据到客户oss bucket,和flink平台作业本身做checkpoint/savepoint用的oss
>> bucket不是同一个。
>> 请问这种场景flink是否支持,如果支持的话应该要怎么配置?谢谢!


flink sql group by后收集数据集合问题

2021-12-01 文章 casel.chen
业务中使用flink sql group by操作后想收集每个分组下所有的数据,如下示例:


kafka源表:
班级 学号  姓名  年龄
1 20001张三   15
2 20011李四   16
1 20002王五   16
2 20012吴六   15


create table source_table (
   class_no: INT,
   student_no: INT,
   name: STRING,
   age: INT
) with (
   'connector' = 'kafka',
   ...
);


mongodb目标表:
班级 学生信息
1 [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no": 20002, 
"name":"王五", "age": 16}]
2 [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no": 20012, 
"name":"吴六", "age": 15}]


create table sink_table (
  class_no INT,
  students: ARRAY>
) with (
  'connector' = 'mongodb',
  ...
);


查了下flink自带的系统函数,接近满足条件的只有collect函数。
insert into sink_table select class_no, collect(ROW(student_no, name, age) from 
source_table group by class_no;


但它返回的是Multiset类型,即Map。如果key的类型是ROW,像我这种场景,直接写mongodb会抛错,因为它会自动强制将key的类型转成STRING。
何况这里我只想收集Array[ROW],相当于只要Map中的keyset,即去重后的Array。


三个问题: 
1. 如果要收集去重的Array[ROW],有什么办法可以做到吗?想尽量做得通用,收集的数据类型是ROW,而不只是case by 
case的针对特定类型的udf,如这里的ROW
2. 如果要收集不去重的Array[ROW],又该怎么写?
3. 访问一个数据类型为Map的数据中key和value,要用什么flink sql语法?


谢谢解答!

关于streamFileSink在checkpoint下生成文件问题

2021-12-01 文章 黄志高
hi,各位大佬,咨询个问题
   
我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整点时刻生成一个文件,我查阅下源码,没有找到整点触发滚动生成文件的逻辑,有大佬可以帮忙分析一下这个整点时刻生成的文件是怎么来的吗,它属于哪个周期的,附件中是我flink任务的checkpoint时间点,和2021年11月30日在1点和2点生成的文件截图,在1点和2点的00分都生成了一个文件,望大佬帮忙看看

Re: flink sql lookup join中维表不可以是视图吗?

2021-12-01 文章 Tony Wei
Hi,

如果兩次 left join 的話是否滿足你的需求呢?
然後在取 temporal table 的字段時,用 IF 去判斷取值。參考 SQL 如下

SELECT
  c.mer_cust_id,
  *IF(k.mer_cust_id IS NOT NULL AND a.mercust_id IS NOT NULL AND
k.mer_cust_id <> '', k.update_time, NULL) AS update_time*
FROM charge_log as c
  LEFT JOIN ka_mer_info FOR SYSTEM_TIME AS OF c.proc_time AS k
ON c.mer_cust_id = k.mer_cust_id
  LEFT JOIN adp_mer_user_info FOR SYSTEM_TIME AS OF c.proc_time AS a
ON c.mer_cust_id = a.mer_cust_id

不過,這種寫法只能適用在兩張 MySQL 表都保證 mer_cust_id 是唯一主鍵的狀況下。如果 mer_cust_id
不是唯一的話,輸出的結果數量會跟原本提供的 SQL 期望的輸出不一致
比如說 ka_mer_info 有 0 筆數據, adp_mer_user_info 有 2 筆數據,原先的 SQL 會得到 1 筆 left
join 沒成功的數據,上面提供的 SQL 則會輸出 2 筆。


casel.chen  於 2021年12月1日 週三 下午6:33寫道:

> lookup join用的维表需要从两张mysql表做关联后得到,因此创建了一个视图。但发现flink sql不支持lookup
> join关联视图,会抛
> Temporal Table Join requires primary key in versioned table, but no
> primary key can be found.
>
>
> 请问这种情况要怎么解决?
>
>
> CREATE VIEW query_mer_view (mer_cust_id, update_time) AS
> SELECT a.mer_cust_id, k.update_time
> FROM ka_mer_info k INNER JOIN adp_mer_user_info a on k.mer_cust_id =
> a.mer_cust_id
> where k.mer_cust_id <> '';
>
>
> SELECT
> DATE_FORMAT(c.create_time, '-MM-dd') AS payment_date,
> c.mer_cust_id,
>
>
> c.trans_amt,
> CASE c.trans_stat WHEN 'S' THEN c.trans_amt ELSE 0 END as
> succ_amt ,
>
>
> 1 as trans_cnt,
> CASE c.trans_stat WHEN 'S' THEN 1 ELSE 0  END as succ_cnt ,
> CASE c.trans_stat WHEN 'F' THEN 1 ELSE 0  END as fail_cnt
>
>
> FROM charge_log as c
>  LEFT JOIN query_mer_view FOR SYSTEM_TIME AS OF
> c.proc_time AS q
>  ON c.mer_cust_id = q.mer_cust_id;


flink sql lookup join中维表不可以是视图吗?

2021-12-01 文章 casel.chen
lookup join用的维表需要从两张mysql表做关联后得到,因此创建了一个视图。但发现flink sql不支持lookup join关联视图,会抛
Temporal Table Join requires primary key in versioned table, but no primary key 
can be found. 


请问这种情况要怎么解决?


CREATE VIEW query_mer_view (mer_cust_id, update_time) AS
SELECT a.mer_cust_id, k.update_time
FROM ka_mer_info k INNER JOIN adp_mer_user_info a on k.mer_cust_id = 
a.mer_cust_id
where k.mer_cust_id <> '';


SELECT
DATE_FORMAT(c.create_time, '-MM-dd') AS payment_date,
c.mer_cust_id,


c.trans_amt,
CASE c.trans_stat WHEN 'S' THEN c.trans_amt ELSE 0 END as succ_amt ,


1 as trans_cnt,
CASE c.trans_stat WHEN 'S' THEN 1 ELSE 0  END as succ_cnt ,
CASE c.trans_stat WHEN 'F' THEN 1 ELSE 0  END as fail_cnt


FROM charge_log as c
 LEFT JOIN query_mer_view FOR SYSTEM_TIME AS OF c.proc_time AS q
 ON c.mer_cust_id = q.mer_cust_id;

Flink checkpoint文件大小与对应内存大小映射关系

2021-12-01 文章 mayifan
Hi,All~!

麻烦大家一个问题,有大佬了解过checkpoint文件大小与实际内存对应的状态数据大小的映射关系吗?

比如Fs状态后端checkpoint后文件大小是1MB,对应的状态数据在内存中占用大概是多少呢?

感谢答复~!

Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-12-01 文章 Yingjie Cao
Hi Jiangang,

Great to hear that, welcome to work together to make the project better.

Best,
Yingjie

刘建刚  于2021年12月1日周三 下午3:27写道:

> Good work for flink's batch processing!
> Remote shuffle service can resolve the container lost problem and reduce
> the running time for batch jobs once failover. We have investigated the
> component a lot and welcome Flink's native solution. We will try it and
> help improve it.
>
> Thanks,
> Liu Jiangang
>
> Yingjie Cao  于2021年11月30日周二 下午9:33写道:
>
> > Hi dev & users,
> >
> > We are happy to announce the open source of remote shuffle project [1]
> for
> > Flink. The project is originated in Alibaba and the main motivation is to
> > improve batch data processing for both performance & stability and
> further
> > embrace cloud native. For more features about the project, please refer
> to
> > [1].
> >
> > Before going open source, the project has been used widely in production
> > and it behaves well on both stability and performance. We hope you enjoy
> > it. Collaborations and feedbacks are highly appreciated.
> >
> > Best,
> > Yingjie on behalf of all contributors
> >
> > [1] https://github.com/flink-extended/flink-remote-shuffle
> >
>


Re:Re: flink sql group by后收集数据问题

2021-12-01 文章 casel.chen



我想要的是一个通用的收集ROW类型集合(ARRAY去重和不去重),不是只针对特定ROW
@DataTypeHint("ROW") 这样写没有问题@DataTypeHint("ROW") 
这样写会报错














在 2021-12-01 11:12:27,"Caizhi Weng"  写道:
>Hi!
>
>UDF 支持 ROW 类型,详见 [1] 中关于 ROW 的示例。
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/udfs/#%e8%87%aa%e5%8a%a8%e7%b1%bb%e5%9e%8b%e6%8e%a8%e5%af%bc
>
>casel.chen  于2021年12月1日周三 上午7:56写道:
>
>> 业务中使用flink sql group by操作后想收集所有的数据,如下示例:
>>
>>
>> kafka源表:
>> 班级 学号  姓名  年龄
>> 1 20001张三   15
>> 2 20011李四   16
>> 1 20002王五   16
>> 2 20012吴六   15
>>
>>
>> create table source_table (
>>class_no: INT,
>>student_no: INT,
>>name: STRING,
>>age: INT
>> ) with (
>>'connector' = 'kafka',
>>...
>> );
>>
>>
>> mongodb目标表:
>> 班级 学生信息
>> 1 [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no":
>> 20002, "name":"王五", "age": 16}]
>> 2 [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no":
>> 20012, "name":"吴六", "age": 15}]
>>
>>
>> create table sink_table (
>>   class_no INT,
>>   students: ARRAY>
>> ) with (
>>   'connector' = 'mongodb',
>>   ...
>> );
>>
>>
>> 查了下flink自带的系统函数,接近满足条件的只有collect函数。
>> insert into sink_table select class_no, collect(ROW(student_no, name, age)
>> from source_table group by class_no;
>>
>>
>> 但它返回的是Multiset类型,即Map> Integer>。如果key的类型是ROW,像我这种场景,直接写mongodb会抛错,因为它会自动强制将key的类型转成STRING。
>> 何况这里我只想收集Array[ROW],相当于只要Map中的keyset,即去重后的Array。
>>
>>
>> 1.
>> 如果要收集去重的Array[ROW],有什么办法可以做到吗?我曾尝试写UDF,但UDF不支持ROW类型,只支持具体的数据类型,有何建议或参考例子?
>> 2. 如果要收集不去重的Array[ROW],又该怎么写?
>> 3. 访问一个数据类型为Map的数据中key和value,分别要用什么flink sql语法?
>>
>>
>> 谢谢解答!
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>