Re: flink 1.8 内的StreamExecutionEnvironment 对于 FileInputFormat 多file path 不兼容问题咨询
Hi, 你的需求是什么?下列哪种? - 1.想用unbounded source,continuous的file source,监控文件夹,发送新文件,且需要支持多文件夹 - 2.只是想用bounded的input format,需要支持多文件 如果是1,现在仍然不支持。 如果是2,那你可以用env.addSource(new InputFormatSourceFunction(..)...)来支持多文件。 Best, Jingsong Lee -- From:王智 Send Time:2020年3月4日(星期三) 17:34 To:user-zh Subject:flink 1.8 内的StreamExecutionEnvironment 对于 FileInputFormat 多file path 不兼容问题咨询 我在使用flink 1.8 自定义 FileInputFormat 的时候遇到了不兼容问题,初步看了源代码,不知道自己得出的结论是否正确,并想了解一下后续趋势和进展,麻烦各位大佬抽空指点一下,先行感谢~~ 问题1: StreamExecutionEnvironment 为什么要做这样的限制?ContinuousFileMonitoringFunction 的作用是什么? 相关的代码描述如下 StreamExecutionEnvironment 对 FileInputFormat 对象有特殊的处理逻辑 if (inputFormat instanceof FileInputFormat) { @SuppressWarnings("unchecked") FileInputFormat
Re: 使用Flink1.10.0读取hive时source并行度问题
Hi jun, Jira: https://issues.apache.org/jira/browse/FLINK-16413 FYI Best, Jingsong Lee -- From:JingsongLee Send Time:2020年3月3日(星期二) 19:06 To:Jun Zhang <825875...@qq.com>; user-zh@flink.apache.org Cc:user-zh@flink.apache.org ; like Subject:Re: 使用Flink1.10.0读取hive时source并行度问题 Hi jun, 很好的建议~ 这是一个优化点~ 可以建一个JIRA Best, Jingsong Lee -- From:Jun Zhang <825875...@qq.com> Send Time:2020年3月3日(星期二) 18:45 To:user-zh@flink.apache.org ; JingsongLee Cc:user-zh@flink.apache.org ; like Subject:回复: 使用Flink1.10.0读取hive时source并行度问题 hi,jinsong: 我想说一个问题, 我开始了自动推断,比如我设置推断的最大并行度是10, 我有一个类似的sql select * from mytable limit 1; hive表mytable有超过10个文件,如果启动了10个并行度是不是有点浪费呢。 在2020年03月2日 16:38,JingsongLee 写道: 建议使用Batch模式来读取Hive table。 Best, Jingsong Lee -- From:like Send Time:2020年3月2日(星期一) 16:35 To:lzljs3620...@aliyun.com Subject:回复: 使用Flink1.10.0读取hive时source并行度问题 我使用的是 StreamTableEnvironment,确实有碰到这个问题呢。 在2020年3月2日 16:16,JingsongLee 写道: 自动推断可能面临资源不足无法启动的问题 理论上不应该呀?Batch作业是可以部分运行的。 Best, Jingsong Lee -- From:like Send Time:2020年3月2日(星期一) 15:35 To:user-zh@flink.apache.org ; lzljs3620...@aliyun.com Subject:回复: 使用Flink1.10.0读取hive时source并行度问题 非常感谢!我尝试关闭自动推断后,已经可以控制source并行度了,自动推断可能面临资源不足无法启动的问题。 在2020年3月2日 15:18,JingsongLee 写道: Hi, 1.10中,Hive source是自动推断并发的,你可以使用以下参数配置到flink-conf.yaml里面来控制并发: - table.exec.hive.infer-source-parallelism=true (默认使用自动推断) - table.exec.hive.infer-source-parallelism.max=1000 (自动推断的最大并发) Sink的并发默认和上游的并发相同,如果有Shuffle,使用配置的统一并发。 Best, Jingsong Lee -- From:like Send Time:2020年3月2日(星期一) 14:58 To:user-zh@flink.apache.org Subject:使用Flink1.10.0读取hive时source并行度问题 hi,大家好 我使用flink1.10.0读取hive表,启动时设置了全局的并行度,但是在读取的时候,发现sink并行度是受约束的, 而source的并行度不受此约束,会根据source的大小改变,大的时候并行度大到1000,请问一下怎么处理这个并行度呢?
Re: 使用Flink1.10.0读取hive时source并行度问题
Hi jun, 很好的建议~ 这是一个优化点~ 可以建一个JIRA Best, Jingsong Lee -- From:Jun Zhang <825875...@qq.com> Send Time:2020年3月3日(星期二) 18:45 To:user-zh@flink.apache.org ; JingsongLee Cc:user-zh@flink.apache.org ; like Subject:回复: 使用Flink1.10.0读取hive时source并行度问题 hi,jinsong: 我想说一个问题, 我开始了自动推断,比如我设置推断的最大并行度是10, 我有一个类似的sql select * from mytable limit 1; hive表mytable有超过10个文件,如果启动了10个并行度是不是有点浪费呢。 在2020年03月2日 16:38,JingsongLee 写道: 建议使用Batch模式来读取Hive table。 Best, Jingsong Lee -- From:like Send Time:2020年3月2日(星期一) 16:35 To:lzljs3620...@aliyun.com Subject:回复: 使用Flink1.10.0读取hive时source并行度问题 我使用的是 StreamTableEnvironment,确实有碰到这个问题呢。 在2020年3月2日 16:16,JingsongLee 写道: 自动推断可能面临资源不足无法启动的问题 理论上不应该呀?Batch作业是可以部分运行的。 Best, Jingsong Lee -- From:like Send Time:2020年3月2日(星期一) 15:35 To:user-zh@flink.apache.org ; lzljs3620...@aliyun.com Subject:回复: 使用Flink1.10.0读取hive时source并行度问题 非常感谢!我尝试关闭自动推断后,已经可以控制source并行度了,自动推断可能面临资源不足无法启动的问题。 在2020年3月2日 15:18,JingsongLee 写道: Hi, 1.10中,Hive source是自动推断并发的,你可以使用以下参数配置到flink-conf.yaml里面来控制并发: - table.exec.hive.infer-source-parallelism=true (默认使用自动推断) - table.exec.hive.infer-source-parallelism.max=1000 (自动推断的最大并发) Sink的并发默认和上游的并发相同,如果有Shuffle,使用配置的统一并发。 Best, Jingsong Lee -- From:like Send Time:2020年3月2日(星期一) 14:58 To:user-zh@flink.apache.org Subject:使用Flink1.10.0读取hive时source并行度问题 hi,大家好 我使用flink1.10.0读取hive表,启动时设置了全局的并行度,但是在读取的时候,发现sink并行度是受约束的, 而source的并行度不受此约束,会根据source的大小改变,大的时候并行度大到1000,请问一下怎么处理这个并行度呢?
Re: 开发相关问题咨询Development related problems consultation
Hi, welcome, For user side, u...@flink.apache.org is for English. user-zh@flink.apache.org is for Chinese. d...@flink.apache.org is for development related discussions, so please not send to it. Best, Jingsong Lee -- From:王博迪 Send Time:2020年3月2日(星期一) 17:08 To:user-zh ; dev Subject:开发相关问题咨询Development related problems consultation 您好, 我是你们flink的新用户,有一些开发相关的问题想咨询,问一下可以和哪个邮箱交流。 谢谢 Hello, I am a new user of flink. I would like to ask you some questions related to development. I would like to know which email can I communicate with
Re: 使用Flink1.10.0读取hive时source并行度问题
建议使用Batch模式来读取Hive table。 Best, Jingsong Lee -- From:like Send Time:2020年3月2日(星期一) 16:35 To:lzljs3620...@aliyun.com Subject:回复: 使用Flink1.10.0读取hive时source并行度问题 我使用的是 StreamTableEnvironment,确实有碰到这个问题呢。 在2020年3月2日 16:16,JingsongLee 写道: > 自动推断可能面临资源不足无法启动的问题 理论上不应该呀?Batch作业是可以部分运行的。 Best, Jingsong Lee -- From:like Send Time:2020年3月2日(星期一) 15:35 To:user-zh@flink.apache.org ; lzljs3620...@aliyun.com Subject:回复: 使用Flink1.10.0读取hive时source并行度问题 非常感谢!我尝试关闭自动推断后,已经可以控制source并行度了,自动推断可能面临资源不足无法启动的问题。 在2020年3月2日 15:18,JingsongLee 写道: Hi, 1.10中,Hive source是自动推断并发的,你可以使用以下参数配置到flink-conf.yaml里面来控制并发: - table.exec.hive.infer-source-parallelism=true (默认使用自动推断) - table.exec.hive.infer-source-parallelism.max=1000 (自动推断的最大并发) Sink的并发默认和上游的并发相同,如果有Shuffle,使用配置的统一并发。 Best, Jingsong Lee -- From:like Send Time:2020年3月2日(星期一) 14:58 To:user-zh@flink.apache.org Subject:使用Flink1.10.0读取hive时source并行度问题 hi,大家好 我使用flink1.10.0读取hive表,启动时设置了全局的并行度,但是在读取的时候,发现sink并行度是受约束的, 而source的并行度不受此约束,会根据source的大小改变,大的时候并行度大到1000,请问一下怎么处理这个并行度呢?
Re: 使用Flink1.10.0读取hive时source并行度问题
> 自动推断可能面临资源不足无法启动的问题 理论上不应该呀?Batch作业是可以部分运行的。 Best, Jingsong Lee -- From:like Send Time:2020年3月2日(星期一) 15:35 To:user-zh@flink.apache.org ; lzljs3620...@aliyun.com Subject:回复: 使用Flink1.10.0读取hive时source并行度问题 非常感谢!我尝试关闭自动推断后,已经可以控制source并行度了,自动推断可能面临资源不足无法启动的问题。 在2020年3月2日 15:18,JingsongLee 写道:Hi, 1.10中,Hive source是自动推断并发的,你可以使用以下参数配置到flink-conf.yaml里面来控制并发: - table.exec.hive.infer-source-parallelism=true (默认使用自动推断) - table.exec.hive.infer-source-parallelism.max=1000 (自动推断的最大并发) Sink的并发默认和上游的并发相同,如果有Shuffle,使用配置的统一并发。 Best, Jingsong Lee -- From:like Send Time:2020年3月2日(星期一) 14:58 To:user-zh@flink.apache.org Subject:使用Flink1.10.0读取hive时source并行度问题 hi,大家好 我使用flink1.10.0读取hive表,启动时设置了全局的并行度,但是在读取的时候,发现sink并行度是受约束的, 而source的并行度不受此约束,会根据source的大小改变,大的时候并行度大到1000,请问一下怎么处理这个并行度呢?
Re: 使用Flink1.10.0读取hive时source并行度问题
Hi, 1.10中,Hive source是自动推断并发的,你可以使用以下参数配置到flink-conf.yaml里面来控制并发: - table.exec.hive.infer-source-parallelism=true (默认使用自动推断) - table.exec.hive.infer-source-parallelism.max=1000 (自动推断的最大并发) Sink的并发默认和上游的并发相同,如果有Shuffle,使用配置的统一并发。 Best, Jingsong Lee -- From:like Send Time:2020年3月2日(星期一) 14:58 To:user-zh@flink.apache.org Subject:使用Flink1.10.0读取hive时source并行度问题 hi,大家好 我使用flink1.10.0读取hive表,启动时设置了全局的并行度,但是在读取的时候,发现sink并行度是受约束的, 而source的并行度不受此约束,会根据source的大小改变,大的时候并行度大到1000,请问一下怎么处理这个并行度呢?
Re: Re: Re: 求助帖:flink 连接kafka source 部署集群报错
+user-zh -- From:JingsongLee Send Time:2020年1月15日(星期三) 16:05 To:Others <41486...@qq.com> Subject:Re: Re: Re: 求助帖:flink 连接kafka source 部署集群报错 是的。 另一个方法是使用[1]的classpath,添加多个jars。 BTW, 回复邮件时请带上user-zh。 Best, Jingsong Lee [1] https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html#usage -- From:Others <41486...@qq.com> Send Time:2020年1月15日(星期三) 15:54 To:user-zh@flink.apache.org JingsongLee Subject:回复: Re: Re: 求助帖:flink 连接kafka source 部署集群报错 Hi, 我的集群 是Standalone 方式部署的 是加在 Flink Master机器下么 还是每一台都要加? 加完之后是否需要重启集群? -- 原始邮件 -- 发件人: "JingsongLee"; 发送时间: 2020年1月15日(星期三) 下午3:46 收件人: "Others"<41486...@qq.com>;"user-zh"; 主题: Re: Re: 求助帖:flink 连接kafka source 部署集群报错 Hi, 我怀疑的你这样打包会导致meta-inf.services的文件相互覆盖。 你试试把flink-json和flink-kafka的jar直接放入flink/lib下 Best, Jingsong Lee -- From:Others <41486...@qq.com> Send Time:2020年1月15日(星期三) 15:27 To:user-zh@flink.apache.org JingsongLee Subject:回复: Re: 求助帖:flink 连接kafka source 部署集群报错 集群环境下应该放在哪个lib下? 一下是打包过程的log [INFO] --- maven-shade-plugin:3.0.0:shade (default) @ flinkjob --- [INFO] Including org.apache.flink:flink-core:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-annotations:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-metrics-core:jar:1.9.1 in the shaded jar. [INFO] Including com.esotericsoftware.kryo:kryo:jar:2.24.0 in the shaded jar. [INFO] Including com.esotericsoftware.minlog:minlog:jar:1.2 in the shaded jar. [INFO] Including org.objenesis:objenesis:jar:2.1 in the shaded jar. [INFO] Including commons-collections:commons-collections:jar:3.2.2 in the shaded jar. [INFO] Including org.apache.commons:commons-compress:jar:1.18 in the shaded jar. [INFO] Including org.apache.flink:flink-shaded-asm-6:jar:6.2.1-7.0 in the shaded jar. [INFO] Including org.apache.commons:commons-lang3:jar:3.3.2 in the shaded jar. [INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded jar. [INFO] Excluding com.google.code.findbugs:jsr305:jar:1.3.9 from the shaded jar. [INFO] Excluding org.apache.flink:force-shading:jar:1.9.1 from the shaded jar. [INFO] Including org.javassist:javassist:jar:3.19.0-GA in the shaded jar. [INFO] Including org.scala-lang:scala-library:jar:2.11.12 in the shaded jar. [INFO] Including org.xerial.snappy:snappy-java:jar:1.1.4 in the shaded jar. [INFO] Including org.apache.flink:flink-shaded-guava:jar:18.0-7.0 in the shaded jar. [INFO] Including org.apache.flink:flink-table-api-java-bridge_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-api-java:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-planner-blink_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-api-scala_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.scala-lang:scala-reflect:jar:2.11.12 in the shaded jar. [INFO] Including org.scala-lang:scala-compiler:jar:2.11.12 in the shaded jar. [INFO] Including org.scala-lang.modules:scala-xml_2.11:jar:1.0.5 in the shaded jar. [INFO] Including org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4 in the shaded jar. [INFO] Including org.apache.flink:flink-table-api-scala-bridge_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-scala_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-streaming-scala_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-runtime-blink_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.codehaus.janino:janino:jar:3.0.9 in the shaded jar. [INFO] Including org.codehaus.janino:commons-compiler:jar:3.0.9 in the shaded jar. [INFO] Including org.apache.calcite.avatica:avatica-core:jar:1.15.0 in the shaded jar. [INFO] Including org.reflections:reflections:jar:0.9.10 in the shaded jar. [INFO] Including org.apache.flink:flink-connector-kafka_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-connector-kafka-base_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.kafka:kafka-clients:jar:2.2.0 in the shaded jar. [INFO] Including com.github.luben:zstd-jni:jar:1.3.8-1 in the shaded jar. [INFO] Including org.lz4:lz4-java:jar:1.5.0 in the shaded jar. [INFO] Including org.apache.flink:flink-json:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-common:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-jdbc_2.11:jar:1.9.1 in the shaded jar. [INFO] Including mysql:mysql-connector-java:jar:5.1.48 in the shaded jar. [INFO] Including com.google.code.gson:gson:jar:2.8.5 in the shaded jar. [INFO] Excluding org.slf4j:slf4j-log4j12:jar:1.7.7 from the shaded jar. [INFO] Excluding log4j:log4j:jar:1.2.17 from the shaded jar. [WARNING] janino-3.0.9.ja
Re: Re: 求助帖:flink 连接kafka source 部署集群报错
Hi, 我怀疑的你这样打包会导致meta-inf.services的文件相互覆盖。 你试试把flink-json和flink-kafka的jar直接放入flink/lib下 Best, Jingsong Lee -- From:Others <41486...@qq.com> Send Time:2020年1月15日(星期三) 15:27 To:user-zh@flink.apache.org JingsongLee Subject:回复: Re: 求助帖:flink 连接kafka source 部署集群报错 集群环境下应该放在哪个lib下? 一下是打包过程的log [INFO] --- maven-shade-plugin:3.0.0:shade (default) @ flinkjob --- [INFO] Including org.apache.flink:flink-core:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-annotations:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-metrics-core:jar:1.9.1 in the shaded jar. [INFO] Including com.esotericsoftware.kryo:kryo:jar:2.24.0 in the shaded jar. [INFO] Including com.esotericsoftware.minlog:minlog:jar:1.2 in the shaded jar. [INFO] Including org.objenesis:objenesis:jar:2.1 in the shaded jar. [INFO] Including commons-collections:commons-collections:jar:3.2.2 in the shaded jar. [INFO] Including org.apache.commons:commons-compress:jar:1.18 in the shaded jar. [INFO] Including org.apache.flink:flink-shaded-asm-6:jar:6.2.1-7.0 in the shaded jar. [INFO] Including org.apache.commons:commons-lang3:jar:3.3.2 in the shaded jar. [INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded jar. [INFO] Excluding com.google.code.findbugs:jsr305:jar:1.3.9 from the shaded jar. [INFO] Excluding org.apache.flink:force-shading:jar:1.9.1 from the shaded jar. [INFO] Including org.javassist:javassist:jar:3.19.0-GA in the shaded jar. [INFO] Including org.scala-lang:scala-library:jar:2.11.12 in the shaded jar. [INFO] Including org.xerial.snappy:snappy-java:jar:1.1.4 in the shaded jar. [INFO] Including org.apache.flink:flink-shaded-guava:jar:18.0-7.0 in the shaded jar. [INFO] Including org.apache.flink:flink-table-api-java-bridge_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-api-java:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-planner-blink_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-api-scala_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.scala-lang:scala-reflect:jar:2.11.12 in the shaded jar. [INFO] Including org.scala-lang:scala-compiler:jar:2.11.12 in the shaded jar. [INFO] Including org.scala-lang.modules:scala-xml_2.11:jar:1.0.5 in the shaded jar. [INFO] Including org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4 in the shaded jar. [INFO] Including org.apache.flink:flink-table-api-scala-bridge_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-scala_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-streaming-scala_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-runtime-blink_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.codehaus.janino:janino:jar:3.0.9 in the shaded jar. [INFO] Including org.codehaus.janino:commons-compiler:jar:3.0.9 in the shaded jar. [INFO] Including org.apache.calcite.avatica:avatica-core:jar:1.15.0 in the shaded jar. [INFO] Including org.reflections:reflections:jar:0.9.10 in the shaded jar. [INFO] Including org.apache.flink:flink-connector-kafka_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-connector-kafka-base_2.11:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.kafka:kafka-clients:jar:2.2.0 in the shaded jar. [INFO] Including com.github.luben:zstd-jni:jar:1.3.8-1 in the shaded jar. [INFO] Including org.lz4:lz4-java:jar:1.5.0 in the shaded jar. [INFO] Including org.apache.flink:flink-json:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-table-common:jar:1.9.1 in the shaded jar. [INFO] Including org.apache.flink:flink-jdbc_2.11:jar:1.9.1 in the shaded jar. [INFO] Including mysql:mysql-connector-java:jar:5.1.48 in the shaded jar. [INFO] Including com.google.code.gson:gson:jar:2.8.5 in the shaded jar. [INFO] Excluding org.slf4j:slf4j-log4j12:jar:1.7.7 from the shaded jar. [INFO] Excluding log4j:log4j:jar:1.2.17 from the shaded jar. [WARNING] janino-3.0.9.jar, flink-table-planner-blink_2.11-1.9.1.jar define 440 overlapping classes: [WARNING] - org.codehaus.janino.util.resource.ResourceCreator [WARNING] - org.codehaus.janino.ReflectionIClass$ReflectionIField [WARNING] - org.codehaus.janino.IClass$1 [WARNING] - org.codehaus.janino.UnitCompiler$35 [WARNING] - org.codehaus.janino.Java$CompilationUnit$SingleStaticImportDeclaration [WARNING] - org.codehaus.janino.Java$PackageMemberEnumDeclaration [WARNING] - org.codehaus.janino.UnitCompiler$13$1 [WARNING] - org.codehaus.janino.Unparser [WARNING] - org.codehaus.janino.CodeContext$Branch [WARNING] - org.codehaus.janino.UnitCompiler$33$2 [WARNING] - 430 more... [WARNING] avatica-core-1.15.0.jar, flink-table-planner-blink_2.11-1.9.1.jar define 605 overlapping classes: [WARNING] - org.apache.calcite.avatica.AvaticaParameter [WARNING] - org.apache.calcite.avatic
Re: 求助帖:flink 连接kafka source 部署集群报错
Hi, 你是不是没有把Json的jar包放入lib下?看起来你的User jar也没用jar-with-dependencies,所以也不会包含json的jar。 Best, Jingsong Lee -- From:Others <41486...@qq.com> Send Time:2020年1月15日(星期三) 15:03 To:user-zh Subject:求助帖:flink 连接kafka source 部署集群报错 我使用的flink 版本 是1.9.1 本地调试正常。部署集群启动时报一下错误 2020-01-15 11:57:44,255 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled exception. org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed. at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69) at com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) ... 9 more Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath. Reason: No factory implements 'org.apache.flink.table.factories.DeserializationSchemaFactory'. The following properties are requested: connector.properties.0.key=group.id connector.properties.0.value=consumer_flink_etl_test connector.properties.1.key=bootstrap.servers connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092 connector.property-version=1 connector.startup-mode=latest-offset connector.topic=flink_etl_pro connector.type=kafka connector.version=universal format.derive-schema=true format.fail-on-missing-field=false format.property-version=1 format.type=json schema.0.name=rowtime schema.0.rowtime.timestamps.from=cTime schema.0.rowtime.timestamps.type=from-field schema.0.rowtime.watermarks.delay=2000 schema.0.rowtime.watermarks.type=periodic-bounded schema.0.type=TIMESTAMP schema.1.name=event schema.1.type=VARCHAR schema.2.name=adSpaceKey schema.2.type=VARCHAR schema.3.name=appkey schema.3.type=VARCHAR schema.4.name=build schema.4.type=VARCHAR schema.5.name=buoyId schema.5.type=BIGINT schema.6.name=gameHtmlId schema.6.type=BIGINT schema.7.name=uid schema.7.type=VARCHAR update-mode=append The following factories have been considered: org.apache.flink.table.catalog.GenericInMemoryCatalogFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory org.apache.flink.table.planner.delegation.BlinkPlannerFactory org.apache.flink.table.planner.delegation.BlinkExecutorFactory org.apache.flink.table.planner.StreamPlannerFactory org.apache.flink.table.executor.StreamExecutorFactory org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory at org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:186) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259) at
Re: 求助帖: 流join场景可能出现的重复计算
Hi ren, Blink的deduplication功能应该是能match你的需求。[1] [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication Best, Jingsong Lee -- From:Caizhi Weng Send Time:2020年1月15日(星期三) 11:53 To:user-zh Subject:Re: 求助帖: 流join场景可能出现的重复计算 Hi, Flink 目前认为所有的 source 都是 append only 的,retract、upsert 等都是内部处理时的概念,对用户是不可见的。 所以目前你只能先通过 group by 和 last_value 等方式实现功能。不过 1.11 有计划支持这样的需求。 Ren Xie 于2020年1月14日周二 下午9:30写道: > 谢谢 > > 考虑过group by , 实际中 一个好多字段的表, 保不准就是那个字段发生了变化. > > 请问 类似的双流操作在开发中常见吗, 怎么都搜不到相似的使用, 按理谁流依赖另一个流做处理 应该算常见吧 > > 还是说我这样的需求呀 实现呀 是野路子? > > Yuan,Youjun 于2020年1月14日周二 下午8:22写道: > > > 取决于具体的场景。想到的有如下几种方案: > > 1,group by student_id和student_name,而不是只group by > > student_id。当然前提是修改同名名字不会推送一条消息到流1. > > 2,过滤掉update的消息 > > 3,基于时间窗口的聚合,对于student表的数据,每n秒输出一个唯一的student_id,然后再与score流join。 > > > > -邮件原件- > > 发件人: xin Destiny > > 发送时间: Tuesday, January 14, 2020 6:39 PM > > 收件人: user-zh@flink.apache.org > > 主题: Re: 求助帖: 流join场景可能出现的重复计算 > > > > Hi, > > 如果说插入两条update操作呢,一次分数是-97,一次是97 > > > > > > > > > > Ren Xie 于2020年1月14日周二 下午6:20写道: > > > > > 实际场景还是有点复杂的, 便于理解 我简化成这样的, 简化后的这个, 没有实际的代码, 抱歉 > > > > > > 大致 写一下 也就是这样了 > > > ```sql > > > select sum(score) > > > from > > > student t1 inner join score t2 on t1.student_id = t2.std_id where > > > t1.student_id = 11 > > > ``` > > > 然后 > > > > > > ```Java > > > String sql = ↑; > > > Table t = tEnv.sqlQuery(sql); > > > DataStream stream1 = tEnv.toAppendStream(t, Integer.class); > > > stream1.keyBy("").sum(""); > > > ``` > > > > > > 这样的一个sql, 在student表插入一个数据, score表插入2个数据后, 会执行一次计算出一个结果97 + 98 > > > > > > update 学生表的name后, 一个新事件进入student的流, 还会触发一次计算, 得到97 + 98 > > > > > > 因为可能有新的成绩插入, 所以对 stream1进行sum操作, 导致 97和98 都被重复计算了一次 > > > > > > > > > Caizhi Weng 于2020年1月14日周二 下午5:49写道: > > > > > > > Hi, > > > > > > > > 有可能的话,是否方便提供一下代码呢? > > > > > > > > Ren Xie 于2020年1月14日周二 下午5:38写道: > > > > > > > > > 学生 > > > > > student_id name > > > > > 11 foo > > > > > > > > > > 学科分数 > > > > > id name score std_id > > > > > 100 math 97 11 > > > > > 101 english 98 11 > > > > > > > > > > 有如下一个场景(假设只有一个学生) > > > > > > > > > > 基于binlog检测这2个表的变化, 计算这个学生的总分数, 使用了Table/SQL API join操作计算 > > > > > > > > > > 假设insert以上数据后到达某时刻, 以上数据都进入了flink, 计算出这个学生总分数 97 + 98 = 195 > > > > > > > > > > 但此时发现学生姓名登记错误, 于是进行了修改, > > > > > 结果此时Flink中学生流中有2个事件(insert的一个+update的一个), 分数流中有2个事件, 计算的总分数就会是 2 * > > > > > (97 > > > + > > > > > 98) = 390 > > > > > > > > > > Q: 请问这种场景下使用什么能够解决, 计算出正确的结果 97 + 98 = 193 > > > > > > > > > > 接触flink不久, 不是太了解, 请大佬给个提示, 谢谢!! > > > > > > > > > > > > > > >
Re: blink planner的org.apache.flink.table.api.ValidationException报错
谢谢, 你可以试下最新的1.9版本或是1.10或是master吗?因为这里修了一些bug,不确定还存在不。 Best, Jingsong Lee -- From:Kevin Liao Send Time:2020年1月14日(星期二) 11:38 To:user-zh ; JingsongLee Subject:Re: blink planner的org.apache.flink.table.api.ValidationException报错 flink 版本是 1.9.1 release Doc 完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约 30 多个字段,我理解这跟字段数关系不大 ``` import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; /** * @author liaoxu Date: 2020/1/13 Time: 12:03 下午. */ @JsonIgnoreProperties(ignoreUnknown = true) public class Doc { private String suv; private Float factor = 1F; private String st; private String agentId; private Long timestamp; ... // omit some, omit getters and setters ``` 希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy) JingsongLee 于2020年1月14日周二 上午11:25写道: Hi Kevin, 这是什么版本? Doc类能完整提供下吗?方便我们复现。 Best, Jingsong Lee -- From:Kevin Liao Send Time:2020年1月13日(星期一) 17:37 To:user-zh Subject:blink planner的org.apache.flink.table.api.ValidationException报错 tEnv.connect(new Kafka() .version("universal") .topic("xxx") .startFromLatest() .property("bootstrap.servers", "") .property("group.id", "")) .withFormat(new Json().failOnMissingField(false).deriveSchema()) .withSchema(new Schema() //.field("logger_name", Types.STRING) //.field("host", Types.STRING) //.field("@timestamp", Types.SQL_TIMESTAMP) //.field("_rowtime", Types.SQL_TIMESTAMP) //.rowtime( //new Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000)) .field("doc", Types.POJO(Doc.class)) ) .inAppendMode() .registerTableSource("xxx"); Table result = tEnv.sqlQuery( "SELECT doc.xxx1, doc.xxx2, ... , doc.xxxN as seq FROM xxx"); //result.printSchema(); tEnv.toAppendStream(result, new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING, STRING, STRING, STRING, STRING, LONG, STRING, INT, STRING, INT)).print(); 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下: 、、、 Exception in thread "main" org.apache.flink.table.api.ValidationException: Type LEGACY(PojoType) of table field 'doc' does not match with type PojoType of the field 'doc' of the TableSource return type. at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121) at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154) at org.apache.flink.table.planner.plan.nodes.physica
Re: 注册table时catalog无法变更
Hi xiyueha, 你可以用TableEnv.sqlUpdate("create table ...")的DDL的方式,这会注册到当前catalog中。 Best, Jingsong Lee -- From:Kurt Young Send Time:2020年1月8日(星期三) 09:17 To:user-zh Cc:xiyueha Subject:Re: 注册table时catalog无法变更 临时表的话只能放在指定的catalog中,不建议将临时表注册到另一个catalog,比如hive catalog。 临时表大部分情况下是不能序列化的,那样的话代码会报错。 Best, Kurt On Tue, Jan 7, 2020 at 9:20 PM 贺小令 wrote: > hi, > > streamTableEnvironment.registerDataStream(tableName, dataStream, fields); > 注册的表都是Temporary Table。 > > 你可以通过: > catalog = new InMemoryExternalCatalog(catalogName); > streamTableEnvironment.registerCatalog(catalogName, catalog); > catalog.createTable() > > 或者 > streamTableEnvironment.getCatalog().get().createTable() > > 的方式来注册表到指定的catalog > > > xiyu...@163.com 于2020年1月7日周二 下午3:20写道: > > > hi,各位: > > > > > 我在开发过程中,通过下面方式注册table时,默认使用的catalog是EnvironmentSettings.DEFAULT_BUILTIN_CATALOG > > streamTableEnvironment.registerDataStream(tableName, dataStream, > > > fields);尝试通过下面方式解决,但是注册的table仍然在EnvironmentSettings.DEFAULT_BUILTIN_CATALOG中 > > streamTableEnvironment.registerCatalog(catalogName, new > > InMemoryExternalCatalog(catalogName)); > > streamTableEnvironment.useCatalog(catalogName);请问,我如何将table注册到指定的catalog? > > > > > > xiyu...@163.com > > >
Re: FLINK 1.9.1 StreamingFileSink 压缩问题
Hi, 看起来你只能改下connector代码才能支持压缩了: ParquetAvroWriters.createAvroParquetWriter里:设置AvroParquetWriter.Builder的压缩格式。 Best, Jingsong Lee -- From:USERNAME Send Time:2020年1月2日(星期四) 13:36 To:user-zh Subject:FLINK 1.9.1 StreamingFileSink 压缩问题 各位好,FLINK 1.9.1 使用 StreamingFileSink 写Parquet到HDFS,能启用压缩吗? --代码 StreamingFileSink sink = StreamingFileSink .forBulkFormat(new Path(FILE_HDFS_PATH), ParquetAvroWriters.forReflectRecord(HDFSBean.class)) .withBucketAssigner(new DateTimeBucketAssigner<>(FILE_HDFS_FORMAT)) .build();
Re: How should i set the field type in mysql when i use temporal table join between kafka and jdbc ?
Hi, user-zh我就说中文啦. 你需要设置成bigint. 具体报什么错? Best, Jingsong Lee -- From:刘世民 Send Time:2020年1月2日(星期四) 13:47 To:user-zh Subject:How should i set the field type in mysql when i use temporal table join between kafka and jdbc ? for example, num_count field type is Long, but no matter if i set it to bigint or something else in mysql table, it has always report errorr... so,what can i should set num_count field type in mysql? thanks Best ! amenhub
Re: StreamTableEnvironment.registerDatastream() 开放用户自定义的schemaDescriptionh和DeserializationSchema
Hi aven, 这是个合理的需求。 现在的问题是: - Flink table只支持Row, Pojo, Tuple, CaseClass作为结构化的数据类型。 - 而你的类型是JSONObject,它其实也是一个结构化的数据类型,但是目前Flink不支持它,所以可以考虑有这样的DeserializationSchema机制来支持它。 但是我理解其实没有差别多少,比如你提供RowDeserializationSchema,其实就是JSONObject到Row的转换,那你完全可以把这个套在DataStream.map中,把它转换成Flink table支持的结构化类型。 Best, Jingsong Lee -- From:aven.wu Send Time:2019年12月31日(星期二) 14:09 To:user-zh@flink.apache.org Subject:回复: StreamTableEnvironment.registerDatastream() 开放用户自定义的schemaDescriptionh和DeserializationSchema 你好! “把 JSONObject类型定义成object类型” 可以解决在确定字段和类型的情况下并且需要编码到程序中。 如果能开放这部分的能力,可以不通过编码(新增POJO)的方式来完成一个Datastream 到 stream 的table注册。 best wish 发送自 Windows 10 版邮件应用 发件人: Terry Wang 发送时间: 2019年12月30日 12:37 收件人: user-zh@flink.apache.org 主题: Re: StreamTableEnvironment.registerDatastream() 开放用户自定义的schemaDescriptionh和DeserializationSchema 你这种需求的一种解决思路,可以把 JSONObject类型定义成object类型,然后注册成table之后通过一个UDTF把JSONObject转换成特定的schema。 Best, Terry Wang > 2019年12月27日 19:56,aven.wu 写道: > > StreamTableEnvironment.registerDatastream(),目前只支持pojo 或者 > 是public属性的对象,根据默认字段或者用户指定的字段注册table,但某些场景下没有固定的格式,比如使用JSONObject类型DataStream就无法通过这种方法注册成table,是否可以提供更底层的API来使table注册的灵活性更高。用户传入schema的描述和自定义的解析器DeserializationSchema. > > > 发送自 Windows 10 版邮件应用 >
Re: Flink1.9批任务yn和ys对任务的影响
SQL的Batch作业是会关闭slotsharing的。 Best, Jingsong Lee -- From:faaron zheng Send Time:2019年12月26日(星期四) 17:23 To:user-zh@flink.apache.org Subject:回复:Flink1.9批任务yn和ys对任务的影响 了解了,感谢三位。我的slot上包括一个hash-join一个hash-agg,加起来刚好256mb。不过因为存在slotsharing的原因,感觉并不容易提前判断。 faaron zheng 邮箱:faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月26日 15:09,JingsongLee 写道: Hi faaron zheng, 如kurt所说,强烈建议使用1.10,现在已拉分支。 TM运行的一个经验值是:TM有10个Slot,TM内存10G:JVM堆内4G、1G网络buffer、manage内存5G(也就是说单个slot的manage内存500M)。 Best, Jingsong Lee -- From:Kurt Young Send Time:2019年12月26日(星期四) 14:07 To:user-zh Subject:Re: Flink1.9批任务yn和ys对任务的影响 也可以试下最新的1.10版本,这个版本里面 sql 的算子已经不再申请固定写死的内存数量, 而是根据当时 slot 能提供多少 managed 内存来自适应了。 Best, Kurt On Thu, Dec 26, 2019 at 1:36 PM Xintong Song wrote: > slot需要多少内存是和具体作业相关的,不同作业差别会比较大。 > > slot的资源需求是根据所有算子的资源需求相加得到的,如果你对你的作业用到了哪些算子比较了解的话,可以根据算子的资源需求推算出来。 > 算子的默认资源需求可以参考 [1],里面有五个“table.exec.resource.*”的配置项,也可以调整这些配置项来更改算子使用的内存。 > > 如果对作业使用到的算子不是很了解的话,那比较简单的办法还是直接提交作业试试看,去日志里面搜"Request slot with > profile"就能够看到slot的资源需求。 > > Thank you~ > > Xintong Song > > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html#execution-options > > On Thu, Dec 26, 2019 at 11:36 AM faaron zheng > wrote: > > > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed > > memory为2g,也就是一个slot平均200m,所以任务没调度起来。 > > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱: > > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月26日 11:23,faaron zheng 写道: > > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed > > memory为2g,也就是一个slot平均200m,所以任务没调度起来。 > > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱: > > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月25日 11:30,Xintong Song 写道: > > Hi faaron, Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。 根据你的参数,在每个 TM > > 的内存为30G不变的情况下,每个 TM 中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。 Flink > > 1.9 的sql batch 算子对 flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot > 的managed > > memory无法满足算子的资源需求了。 Thank you~ Xintong Song On Wed, Dec 25, 2019 at 11:09 > > AM faaron zheng wrote: > 跑tpcds的query1: flink > run > > -m yarn-cluster -d -p 100 -yn 20 -ys 5 -yjm 60g > -ytm 30g 任务可以正常执行 flink > > run -m yarn-cluster -d -p 100 -yn 10 -ys 10 -yjm > 60g -ytm 30g > > 任务在做hashjoin的时候就会失败 报错是No pooled slot available and request to > > > ResourceManager for new slot failed 搞不懂这有啥关系,求指教 faaron zheng 邮箱: > > > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 >
Re: Flink1.9批任务yn和ys对任务的影响
Hi faaron zheng, 如kurt所说,强烈建议使用1.10,现在已拉分支。 TM运行的一个经验值是:TM有10个Slot,TM内存10G:JVM堆内4G、1G网络buffer、manage内存5G(也就是说单个slot的manage内存500M)。 Best, Jingsong Lee -- From:Kurt Young Send Time:2019年12月26日(星期四) 14:07 To:user-zh Subject:Re: Flink1.9批任务yn和ys对任务的影响 也可以试下最新的1.10版本,这个版本里面 sql 的算子已经不再申请固定写死的内存数量, 而是根据当时 slot 能提供多少 managed 内存来自适应了。 Best, Kurt On Thu, Dec 26, 2019 at 1:36 PM Xintong Song wrote: > slot需要多少内存是和具体作业相关的,不同作业差别会比较大。 > > slot的资源需求是根据所有算子的资源需求相加得到的,如果你对你的作业用到了哪些算子比较了解的话,可以根据算子的资源需求推算出来。 > 算子的默认资源需求可以参考 [1],里面有五个“table.exec.resource.*”的配置项,也可以调整这些配置项来更改算子使用的内存。 > > 如果对作业使用到的算子不是很了解的话,那比较简单的办法还是直接提交作业试试看,去日志里面搜"Request slot with > profile"就能够看到slot的资源需求。 > > Thank you~ > > Xintong Song > > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html#execution-options > > On Thu, Dec 26, 2019 at 11:36 AM faaron zheng > wrote: > > > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed > > memory为2g,也就是一个slot平均200m,所以任务没调度起来。 > > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱: > > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月26日 11:23,faaron zheng 写道: > > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed > > memory为2g,也就是一个slot平均200m,所以任务没调度起来。 > > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱: > > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月25日 11:30,Xintong Song 写道: > > Hi faaron, Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。 根据你的参数,在每个 TM > > 的内存为30G不变的情况下,每个 TM 中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。 Flink > > 1.9 的sql batch 算子对 flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot > 的managed > > memory无法满足算子的资源需求了。 Thank you~ Xintong Song On Wed, Dec 25, 2019 at 11:09 > > AM faaron zheng wrote: > 跑tpcds的query1: flink > run > > -m yarn-cluster -d -p 100 -yn 20 -ys 5 -yjm 60g > -ytm 30g 任务可以正常执行 flink > > run -m yarn-cluster -d -p 100 -yn 10 -ys 10 -yjm > 60g -ytm 30g > > 任务在做hashjoin的时候就会失败 报错是No pooled slot available and request to > > > ResourceManager for new slot failed 搞不懂这有啥关系,求指教 faaron zheng 邮箱: > > > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 >
Re: Flink实时数仓落Hive一般用哪种方式好?
实时性取决于checkpoint时间间隔。 Flink这边的sink没有合并小文件的功能。 Best, Jingsong Lee -- From:陈帅 Send Time:2019年12月10日(星期二) 21:45 To:JingsongLee Subject:Re: Flink实时数仓落Hive一般用哪种方式好? 我想要的streaming写就是数据实时写入HDFS文件,场景有实时数据仓库等。需要平衡实时性以及小文件过多的问题。目前处理小文件问题的方法都是在事后合并文件吗? JingsongLee 于2019年12月10日周二 上午10:48写道: Hi 陈帅, 1.BulkWriter.Factory接口不适合ORC, 正如yue ma所说,你需要一些改动 2.StreamingFileSink整个机制都是基于做checkpoint才会真正move文件的,不知道你所想的streaming写是什么,以及对你的业务场景有什么要求吗? Best, Jingsong Lee -- From:陈帅 Send Time:2019年12月10日(星期二) 08:21 To:user-zh@flink.apache.org ; JingsongLee Subject:Re: Flink实时数仓落Hive一般用哪种方式好? 1. 相比Parquet,目前StreamingFileSink支持ORC的难点在哪里呢? 2. BulkWriter是不是攒微批写文件的? JingsongLee 于2019年12月9日周一 下午3:24写道: Hi 帅, - 目前可以通过改写StreamingFileSink的方式来支持Parquet。 (但是目前StreamingFileSink支持ORC比较难) - BulkWriter和批处理没有关系,它只是StreamingFileSink的一种概念。 - 如果sync hive分区,这需要自定义了,目前StreamingFileSink没有现成的。 在1.11中,Table层会持续深入这方面的处理,实时数仓落hive,在后续会一一解决数据倾斜、分区可见性等问题。[1] [1] https://issues.apache.org/jira/browse/FLINK-14249 Best, Jingsong Lee -- From:陈帅 Send Time:2019年12月8日(星期日) 10:04 To:user-zh@flink.apache.org Subject:Flink实时数仓落Hive一般用哪种方式好? 有人说直接写到HBase,再在Hive关联Hbase表 但是我想直接写文件到HDFS,再通过Hive外表加载,不过有几个问题不明白: 1. 如果用行式格式实时写没有问题,StreamingFileSink也支持,但是如果我想用列式格式(如Parquet或ORC) 写的话,目前来看没有现成的Streaming Writer,官方提供的都是 BulkWriter,只支持批处理方式。是不是不建议用列式格式进行流式写?如果能的话需要自己定义StreamingWriter吗?对于 业务上的Update和Delete操作 数据一般是如何sync进Hive的? 2. 写入HDFS文件后,要怎么让Hive自动发现新加的分区?能在Flink ETL程序中一并完成吗?
Re: Flink RetractStream如何转成AppendStream?
目前不能由SQL直接转。 Best, Jingsong Lee -- From:陈帅 Send Time:2019年12月10日(星期二) 21:48 To:JingsongLee Subject:Re: Flink RetractStream如何转成AppendStream? 代码api的方式我知道怎么转,想知道用sql的方式要如何转?需要先写到一张临时表再sink到目标表?有例子吗? JingsongLee 于2019年12月10日周二 上午10:49写道: 参考下lucas.wu的例子? Best, Jingsong Lee -- From:陈帅 Send Time:2019年12月10日(星期二) 08:25 To:user-zh@flink.apache.org ; JingsongLee Subject:Re: Flink RetractStream如何转成AppendStream? "你可以先把RetractStream转成DataStream,这样就出现了Tuple的stream,然后你再写个MapFunc过滤,最后通过DataStream写入Kafka中。" ==>我想知道通过Flink SQL方式要如何实现这种转换? JingsongLee 于2019年12月9日周一 下午3:17写道: Hi 帅, 你可以先把RetractStream转成DataStream,这样就出现了Tuple的stream,然后你再写个MapFunc过滤,最后通过DataStream写入Kafka中。 Best, Jingsong Lee -- From:Jark Wu Send Time:2019年12月8日(星期日) 11:54 To:user-zh Subject:Re: Flink RetractStream如何转成AppendStream? Hi, 目前 Kafka 只支持 append mode,所以无法消费 retract stream。在 Flink 1.11,社区将计划支持 RetractStream 转成 AppendStream 的操作,以 changelog 的方式输出。 Best, Jark On Sun, 8 Dec 2019 at 10:08, 陈帅 wrote: > 在用Flink做实时数仓时遇到group by统计后需要将结果发到kafka,但是现在的kafka > > sink是不支持RetractStream,因此会报错,有没有什么方法将RetractStream转成AppendStream(例如,忽略撤回的消息,只发送更新的消息),从而可以将结果发到kafka? >
Re: Re: Flink实时数仓落Hive一般用哪种方式好?
Hi hjxhainan, 如果你要取消订阅。 请发送邮件到user-zh-unsubscr...@flink.apache.org Best, Jingsong Lee -- From:hjxhai...@163.com Send Time:2019年12月10日(星期二) 10:52 To:user-zh ; JingsongLee ; 陈帅 Subject:Re: Re: Flink实时数仓落Hive一般用哪种方式好? 怎么退出邮件订阅 hjxhai...@163.com 发件人: JingsongLee 发送时间: 2019-12-10 10:48 收件人: 陈帅; user-zh@flink.apache.org 主题: Re: Flink实时数仓落Hive一般用哪种方式好? Hi 陈帅, 1.BulkWriter.Factory接口不适合ORC, 正如yue ma所说,你需要一些改动 2.StreamingFileSink整个机制都是基于做checkpoint才会真正move文件的,不知道你所想的streaming写是什么,以及对你的业务场景有什么要求吗? Best, Jingsong Lee -- From:陈帅 Send Time:2019年12月10日(星期二) 08:21 To:user-zh@flink.apache.org ; JingsongLee Subject:Re: Flink实时数仓落Hive一般用哪种方式好? 1. 相比Parquet,目前StreamingFileSink支持ORC的难点在哪里呢? 2. BulkWriter是不是攒微批写文件的? JingsongLee 于2019年12月9日周一 下午3:24写道: Hi 帅, - 目前可以通过改写StreamingFileSink的方式来支持Parquet。 (但是目前StreamingFileSink支持ORC比较难) - BulkWriter和批处理没有关系,它只是StreamingFileSink的一种概念。 - 如果sync hive分区,这需要自定义了,目前StreamingFileSink没有现成的。 在1.11中,Table层会持续深入这方面的处理,实时数仓落hive,在后续会一一解决数据倾斜、分区可见性等问题。[1] [1] https://issues.apache.org/jira/browse/FLINK-14249 Best, Jingsong Lee -- From:陈帅 Send Time:2019年12月8日(星期日) 10:04 To:user-zh@flink.apache.org Subject:Flink实时数仓落Hive一般用哪种方式好? 有人说直接写到HBase,再在Hive关联Hbase表 但是我想直接写文件到HDFS,再通过Hive外表加载,不过有几个问题不明白: 1. 如果用行式格式实时写没有问题,StreamingFileSink也支持,但是如果我想用列式格式(如Parquet或ORC) 写的话,目前来看没有现成的Streaming Writer,官方提供的都是 BulkWriter,只支持批处理方式。是不是不建议用列式格式进行流式写?如果能的话需要自己定义StreamingWriter吗?对于 业务上的Update和Delete操作 数据一般是如何sync进Hive的? 2. 写入HDFS文件后,要怎么让Hive自动发现新加的分区?能在Flink ETL程序中一并完成吗?
Re: Flink RetractStream如何转成AppendStream?
参考下lucas.wu的例子? Best, Jingsong Lee -- From:陈帅 Send Time:2019年12月10日(星期二) 08:25 To:user-zh@flink.apache.org ; JingsongLee Subject:Re: Flink RetractStream如何转成AppendStream? "你可以先把RetractStream转成DataStream,这样就出现了Tuple的stream,然后你再写个MapFunc过滤,最后通过DataStream写入Kafka中。" ==>我想知道通过Flink SQL方式要如何实现这种转换? JingsongLee 于2019年12月9日周一 下午3:17写道: Hi 帅, 你可以先把RetractStream转成DataStream,这样就出现了Tuple的stream,然后你再写个MapFunc过滤,最后通过DataStream写入Kafka中。 Best, Jingsong Lee -- From:Jark Wu Send Time:2019年12月8日(星期日) 11:54 To:user-zh Subject:Re: Flink RetractStream如何转成AppendStream? Hi, 目前 Kafka 只支持 append mode,所以无法消费 retract stream。在 Flink 1.11,社区将计划支持 RetractStream 转成 AppendStream 的操作,以 changelog 的方式输出。 Best, Jark On Sun, 8 Dec 2019 at 10:08, 陈帅 wrote: > 在用Flink做实时数仓时遇到group by统计后需要将结果发到kafka,但是现在的kafka > > sink是不支持RetractStream,因此会报错,有没有什么方法将RetractStream转成AppendStream(例如,忽略撤回的消息,只发送更新的消息),从而可以将结果发到kafka? >
Re: Flink实时数仓落Hive一般用哪种方式好?
Hi 陈帅, 1.BulkWriter.Factory接口不适合ORC, 正如yue ma所说,你需要一些改动 2.StreamingFileSink整个机制都是基于做checkpoint才会真正move文件的,不知道你所想的streaming写是什么,以及对你的业务场景有什么要求吗? Best, Jingsong Lee -- From:陈帅 Send Time:2019年12月10日(星期二) 08:21 To:user-zh@flink.apache.org ; JingsongLee Subject:Re: Flink实时数仓落Hive一般用哪种方式好? 1. 相比Parquet,目前StreamingFileSink支持ORC的难点在哪里呢? 2. BulkWriter是不是攒微批写文件的? JingsongLee 于2019年12月9日周一 下午3:24写道: Hi 帅, - 目前可以通过改写StreamingFileSink的方式来支持Parquet。 (但是目前StreamingFileSink支持ORC比较难) - BulkWriter和批处理没有关系,它只是StreamingFileSink的一种概念。 - 如果sync hive分区,这需要自定义了,目前StreamingFileSink没有现成的。 在1.11中,Table层会持续深入这方面的处理,实时数仓落hive,在后续会一一解决数据倾斜、分区可见性等问题。[1] [1] https://issues.apache.org/jira/browse/FLINK-14249 Best, Jingsong Lee -- From:陈帅 Send Time:2019年12月8日(星期日) 10:04 To:user-zh@flink.apache.org Subject:Flink实时数仓落Hive一般用哪种方式好? 有人说直接写到HBase,再在Hive关联Hbase表 但是我想直接写文件到HDFS,再通过Hive外表加载,不过有几个问题不明白: 1. 如果用行式格式实时写没有问题,StreamingFileSink也支持,但是如果我想用列式格式(如Parquet或ORC) 写的话,目前来看没有现成的Streaming Writer,官方提供的都是 BulkWriter,只支持批处理方式。是不是不建议用列式格式进行流式写?如果能的话需要自己定义StreamingWriter吗?对于 业务上的Update和Delete操作 数据一般是如何sync进Hive的? 2. 写入HDFS文件后,要怎么让Hive自动发现新加的分区?能在Flink ETL程序中一并完成吗?
Re: [flink-sql]使用tableEnv.sqlUpdate(ddl);方式创表,如何指定rowtime?
Hi 猫猫: 在DDL上定义rowtime是刚刚支持的功能,文档正在编写中。[1] 你可以通过master的代码来试用,社区正在准备发布1.10,到时候会有release版本可用。 [2] 中有使用的完整例子,FYI。 [1] https://issues.apache.org/jira/browse/FLINK-14320 [2] https://github.com/apache/flink/blob/2ecf7cacbe742099d78c528de962fccf13c14629/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TimeAttributeITCase.scala Best, Jingsong Lee -- From:猫猫 <16770...@qq.com> Send Time:2019年12月6日(星期五) 17:52 To:user-zh Subject:[flink-sql]使用tableEnv.sqlUpdate(ddl);方式创表,如何指定rowtime? 我使用tableEnv.sqlUpdate(ddl);方式创建表 但是我无法像通过流注册方式一样,添加rowtime。我尝试在定义字段上添加rowtime,但是创表语句会报错。 请问在flink中是否支持使用该种方式创建流表,并开窗? 我的例子是一个csv源,我还没有尝试kafka源,但是我看了文档,也没有找到相关描述。 sql创表语句如下: CREATE TABLE T_UserBehavior( userId BIGINT, itemId BIGINT, categoryId BIGINT, behavior VARCHAR, optime BIGINT ) WITH ( 'connector.type' = 'filesystem', -- required: specify to connector type 'connector.path' = 'file:///E:\MyGitProject\flink-study\Hot-Item\src\main\resources\UserBehavior-less.csv', -- required: path to a file or directory 'format.type' = 'csv', 'format.fields.0.name' = 'userId', -- required: define the schema either by using type information 'format.fields.0.type' = 'BIGINT', 'format.fields.1.name' = 'itemId', 'format.fields.1.type' = 'BIGINT', 'format.fields.2.name' = 'categoryId', 'format.fields.2.type' = 'BIGINT', 'format.fields.3.name' = 'behavior', 'format.fields.3.type' = 'VARCHAR', 'format.fields.4.name' = 'optime', 'format.fields.4.type' = 'BIGINT' );
Re: Flink实时数仓落Hive一般用哪种方式好?
Hi 帅, - 目前可以通过改写StreamingFileSink的方式来支持Parquet。 (但是目前StreamingFileSink支持ORC比较难) - BulkWriter和批处理没有关系,它只是StreamingFileSink的一种概念。 - 如果sync hive分区,这需要自定义了,目前StreamingFileSink没有现成的。 在1.11中,Table层会持续深入这方面的处理,实时数仓落hive,在后续会一一解决数据倾斜、分区可见性等问题。[1] [1] https://issues.apache.org/jira/browse/FLINK-14249 Best, Jingsong Lee -- From:陈帅 Send Time:2019年12月8日(星期日) 10:04 To:user-zh@flink.apache.org Subject:Flink实时数仓落Hive一般用哪种方式好? 有人说直接写到HBase,再在Hive关联Hbase表 但是我想直接写文件到HDFS,再通过Hive外表加载,不过有几个问题不明白: 1. 如果用行式格式实时写没有问题,StreamingFileSink也支持,但是如果我想用列式格式(如Parquet或ORC) 写的话,目前来看没有现成的Streaming Writer,官方提供的都是 BulkWriter,只支持批处理方式。是不是不建议用列式格式进行流式写?如果能的话需要自己定义StreamingWriter吗?对于 业务上的Update和Delete操作 数据一般是如何sync进Hive的? 2. 写入HDFS文件后,要怎么让Hive自动发现新加的分区?能在Flink ETL程序中一并完成吗?
Re: Flink RetractStream如何转成AppendStream?
+1 to lucas.wu Best, Jingsong Lee -- From:lucas.wu Send Time:2019年12月9日(星期一) 11:39 To:user-zh Subject:Re: Flink RetractStream如何转成AppendStream? 可以使用类似的方式 // val sstream = result4.toRetractStream[Row],filter(_.1==trye).map(_._2) // val result5 = tEnv.fromDataStream(sstream) // result5.toAppendStream[Row].print() 原始邮件 发件人:Jark wuimj...@gmail.com 收件人:user-zhuser...@flink.apache.org 发送时间:2019年12月8日(周日) 11:53 主题:Re: Flink RetractStream如何转成AppendStream? Hi, 目前 Kafka 只支持 append mode,所以无法消费 retract stream。在 Flink 1.11,社区将计划支持 RetractStream 转成 AppendStream 的操作,以 changelog 的方式输出。 Best, Jark On Sun, 8 Dec 2019 at 10:08, 陈帅 casel.c...@gmail.com wrote: 在用Flink做实时数仓时遇到group by统计后需要将结果发到kafka,但是现在的kafka sink是不支持RetractStream,因此会报错,有没有什么方法将RetractStream转成AppendStream(例如,忽略撤回的消息,只发送更新的消息),从而可以将结果发到kafka?
Re: Flink RetractStream如何转成AppendStream?
Hi 帅, 你可以先把RetractStream转成DataStream,这样就出现了Tuple的stream,然后你再写个MapFunc过滤,最后通过DataStream写入Kafka中。 Best, Jingsong Lee -- From:Jark Wu Send Time:2019年12月8日(星期日) 11:54 To:user-zh Subject:Re: Flink RetractStream如何转成AppendStream? Hi, 目前 Kafka 只支持 append mode,所以无法消费 retract stream。在 Flink 1.11,社区将计划支持 RetractStream 转成 AppendStream 的操作,以 changelog 的方式输出。 Best, Jark On Sun, 8 Dec 2019 at 10:08, 陈帅 wrote: > 在用Flink做实时数仓时遇到group by统计后需要将结果发到kafka,但是现在的kafka > > sink是不支持RetractStream,因此会报错,有没有什么方法将RetractStream转成AppendStream(例如,忽略撤回的消息,只发送更新的消息),从而可以将结果发到kafka? >
Re: DML去重,translate时报错
Hi 叶贤勋: 现在去重现在支持insert into select 语法。 问题在于你的这个SQL怎么没产出UniqueKey 这里面可能有blink-planner的bug。 CC: @Jark Wu @godfrey he (JIRA) Best, Jingsong Lee -- From:叶贤勋 Send Time:2019年11月21日(星期四) 16:20 To:user-zh@flink.apache.org Subject:DML去重,translate时报错 Hi 大家好: Flink版本1.9.0, SQL1: CREATE TABLE user_log ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_behavior', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = 'localhost:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'localhost:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); SQL2: CREATE TABLE user_dist ( dt VARCHAR, user_id VARCHAR, behavior VARCHAR ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', 'connector.table' = 'user_behavior_dup', 'connector.username' = 'root', 'connector.password' = ‘**', 'connector.write.flush.max-rows' = '1' ); SQL3: INSERT INTO user_dist SELECT dt, user_id, behavior FROM ( SELECT dt, user_id, behavior, ROW_NUMBER() OVER (PARTITION BY dt, user_id, behavior ORDER BY proc desc ) AS rownum FROM (select DATE_FORMAT(ts, '-MM-dd HH:00') as dt,user_id,behavior,PROCTIME() as proc from user_log) ) WHERE rownum = 1; 在对SQL3执行tableEnv.sqlUpdate(SQL3)时,报错: Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348) 请问去重现在不支持insert into select 语法吗? | | 叶贤勋 | | yxx_c...@163.com | 签名由网易邮箱大师定制
Re: Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错
override getResultType方法,返回Types.SQL_TIMESTAMP. 这样应该可以绕过。 1.10会修复这个问题。 Best, Jingsong Lee -- From:守护 <346531...@qq.com> Send Time:2019年9月5日(星期四) 12:11 To:user-zh@flink.apache.org JingsongLee ; user-zh Subject:回复: Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错 在哪声明DataType,这个要引入什么包吗,求指点,我的udf代码如下: import org.apache.flink.table.functions.ScalarFunction; import java.sql.Timestamp; public class UTC2Local extends ScalarFunction { public Timestamp eval(Timestamp s) { long timestamp = s.getTime() + 2880; return new Timestamp(timestamp); } } -- 原始邮件 -- 发件人: "JingsongLee"; 发送时间: 2019年9月5日(星期四) 中午11:55 收件人: "user-zh"; 主题: Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错 你声明了DataType吗?代码怎么写的? 由于目前只支持精度<=3,所以你得用DataTypes.TIMESTAMP(3)来表示。 Best, Jingsong Lee -- From:守护 <346531...@qq.com> Send Time:2019年9月5日(星期四) 11:48 To:user-zh Subject:flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错 社区的各位大佬好: 使用场景:flink1.9版本使用flinkSQL创建udf函数使用没有问,当切换到blinkSQL使用这个udf就会报错TIMESTAMP类型错误,udf实现的功能也很简单,就是将时间+8小时,报错信息如下 org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Not support dataType: TIMESTAMP(9) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) Caused by: java.lang.RuntimeException: Not support dataType: TIMESTAMP(9) at org.apache.flink.table.dataformat.DataFormatConverters.getConverterForDataType(DataFormatConverters.java:248) at org.apache.flink.table.planner.codegen.CodeGenUtils$.isConverterIdentity(CodeGenUtils.scala:661) at org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:669) at org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:665) at org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternalIfNeeded(CodeGenUtils.scala:687) at org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:79) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:737) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:451) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51) at org.apache.calcite.rex.RexCall.accept(RexCall.java:191) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131) at org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150) at org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104)
Re: 回复: 关于Flink SQL DISTINCT问题
一般是按时间(比如天)来group by,state配置了超时过期的时间。 基本的去重方式就是靠state(比如RocksDbState)。 有mini-batch来减少对state的访问。 如果有倾斜,那是解倾斜问题的话题了。 Best, Jingsong Lee -- From:lvwenyuan Send Time:2019年9月4日(星期三) 15:11 To:user-zh Subject:Re:回复: 关于Flink SQL DISTINCT问题 对,肯定是按照窗口去重的。我就想问下,窗口去重时,所采用的方式 在 2019-09-04 14:38:29,"athlon...@gmail.com" 写道: >在窗口内去重吧,不可能无限保留去重数据的 > > > >athlon...@gmail.com > >发件人: lvwenyuan >发送时间: 2019-09-04 14:28 >收件人: user-zh >主题: 关于Flink SQL DISTINCT问题 >各位大佬好: > 我想问下,关于flink sql的实时去重,就是count(distinct user_id) > 。就是Flink内部是如何做到实时去重,如果对于数据量比较大的时候实时去重,是否会有性能问题。用的Blink Planner
Re: Flink SQL 时间问题
Hi: 1.是的,目前只能是UTC,如果你有计算要求,你可以考虑改变的业务的窗口时间。 2.支持long的,你输入是不是int才会报错的,具体报错的信息? Best, Jingsong Lee -- From:hb <343122...@163.com> Send Time:2019年9月3日(星期二) 10:44 To:user-zh Subject:Flink SQL 时间问题 使用kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table ``` ... schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime() schema .field("_rowtime", Types.SQL_TIMESTAMP()) .rowtime( new Rowtime() .timestampsFromField("eventTime") .watermarksPeriodicBounded(1000) ) ``` 问题1. 生成的 _proctime 处理时间字段, 结果显示的时区是UTC, 怎么调整成 +8时区. 问题2. eventTime 事件时间字段怎么支持Long类型. 我输入到kafka记录为 {"eventTime": 10, "id":1,"name":"hb"} 会提示 eventTime 字段类型问题
Re: [Discuss] What should the "Data Source" be translated into Chinese
可以直接保留不用翻译吗? Best, Jingsong Lee -- From:WangHengwei Send Time:2019年8月13日(星期二) 11:50 To:user-zh Subject:[Discuss] What should the "Data Source" be translated into Chinese Hi all, I'm working on [FLINK-13405] Translate "Basic API Concepts" page into Chinese. I have a problem. Usually we translate "Data Source" into "数据源" but there is no agreed translation for "Data Sink". Since it often appears in documents, I think we'd better to have a unified translation. I have some alternatives, e.g. "数据沉淀","数据归" or "数据终". Committer Xingcan Cui has a good suggestion for "数据汇" which corresponds to source ("数据源"). I asked Committer Jark Wu, he is also fine with it. I think "数据汇" is a good representation of flow charactiristics so I would like to use it. I want to hear more thoughts from the community whether we should translate it and what it should be translated into. Thanks, WangHW