Re: Re: filesystem connector不支持跨subtask合并小文件
你好, 生成的三个文件挺小的,不到2kb,1k多一点,配这个是为了合并后比2k大 lixin58...@163.com 发件人: Rui Li 发送时间: 2021-08-05 11:42 收件人: user-zh 主题: Re: filesystem connector不支持跨subtask合并小文件 你好, 看到你的compaction.file-size配置成了2kb,这个是希望合并以后的文件的target size只有2kb么 On Wed, Aug 4, 2021 at 5:39 PM lixin58...@163.com wrote: > 你好, > 在使用filesystem > connector过程中,开启了compaction,使用parquet列式文件,指定3个并行度,但发现无论如何也触发不了合并,因为列式文件是checkpoint触发时才会滚动,这样同一checkpoint内会产生与并行度相同的文件,按说此时文件数已经大于1了,为什么不合并呢? > > create table fs_parquet_compact > (userid bigint, name string, part string) > PARTITIONED BY (part) > with( > 'connector' = 'filesystem', > 'path' = 'hdfs:///data/fs_parquet_compact', > 'format' = 'parquet', > 'auto-compaction' = 'true', > 'compaction.file-size' = '2kb', > 'sink.rolling-policy.file-size' = '500b', > 'sink.rolling-policy.rollover-interval' = '800s', > 'sink.rolling-policy.check-interval' = '60s' > ); > > > > lixin58...@163.com > -- Best regards! Rui Li
?????? flink 1.13.1 ????hive??????????hive sql????????
SqlParser.parseStmtlist()sqlSqlNode toString()??unicode ---- ??: "user-zh"
flink sql sink到hbase报错hbase版本问题
请教各位大佬一个问题, 使用flink sql sink数据到hbase (flink版本 1.13.1 hbase版本2.2.6) 提交任务后,一直报错误是 java.lang.RuntimeException: hbase-default.xml file seems to be for an older version of HBase (2.2.3), this version is 2.2.6 已经在连接器参数里面配置了 'properties.hbase.defaults.for.version.skip'='true', hbase-default.xml也配置了跳过版本检查还是不行。。。 请问下,这个2.2.3这个数字是怎么来的呢? 该如何解决这个问题呢? 提前感谢各位大佬 部分异常信息如下: Table options are: 'connector'='hbase-2.2' 'properties.hbase.defaults.for.version.skip'='true' 'table-name'='' 'zookeeper.quorum'='xxx:xxx' at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:171) at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:373) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:201) at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:162) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) at net.matomo.impl.FlinkCDCVisitorChannel.(FlinkCDCVisitorChannel.java:73) at net.matomo.impl.FlinkCDCVisitorChannel.main(FlinkCDCVisitorChannel.java:86) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ... 8 more Caused by: java.lang.RuntimeException: hbase-default.xml file seems to be for an older version of HBase (2.2.3), this version is 2.2.6 at org.apache.hadoop.hbase.HBaseConfiguration.checkDefaultsVersion(HBaseConfiguration.java:74) at org.apache.hadoop.hbase.HBaseConfiguration.addHbaseResources(HBaseConfiguration.java:84) at org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:98) at org.apache.flink.connector.hbase.util.HBaseConfigurationUtil.getHBaseConfiguration(HBaseConfigurationUtil.java:49) at org.apache.flink.connector.hbase.options.HBaseOptions.getHBaseConfiguration(HBaseOptions.java:192) at org.apache.flink.connector.hbase2.HBase2DynamicTableFactory.createDynamicTableSink(HBase2DynamicTableFactory.java:101) at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:168) ... 33 more
Re: filesystem connector不支持跨subtask合并小文件
你好, 看到你的compaction.file-size配置成了2kb,这个是希望合并以后的文件的target size只有2kb么 On Wed, Aug 4, 2021 at 5:39 PM lixin58...@163.com wrote: > 你好, > 在使用filesystem > connector过程中,开启了compaction,使用parquet列式文件,指定3个并行度,但发现无论如何也触发不了合并,因为列式文件是checkpoint触发时才会滚动,这样同一checkpoint内会产生与并行度相同的文件,按说此时文件数已经大于1了,为什么不合并呢? > > create table fs_parquet_compact > (userid bigint, name string, part string) > PARTITIONED BY (part) > with( > 'connector' = 'filesystem', > 'path' = 'hdfs:///data/fs_parquet_compact', > 'format' = 'parquet', > 'auto-compaction' = 'true', > 'compaction.file-size' = '2kb', > 'sink.rolling-policy.file-size' = '500b', > 'sink.rolling-policy.rollover-interval' = '800s', > 'sink.rolling-policy.check-interval' = '60s' > ); > > > > lixin58...@163.com > -- Best regards! Rui Li
Re: sql hints 在寫入透過 hive 創建的表時沒有作用
Hi Caizhi, 我測試了 sink.rolling-policy.rollover-interval 這個配置,並且改使用 csv hive table 作為 sink table,結果是符合預期的。再次謝謝你的幫忙。 Tony Wei 於 2021年8月5日 週四 上午10:40寫道: > Hi, > > 感謝指正,我的目的是為了測試 sql hints 是否生效,選擇 `sink.parallelism` 是 > 單純因為這個配置比較好觀察結果。 > 我會再嘗試其他 hive streaming sink 的配置測試看看。謝謝。 > > Caizhi Weng 於 2021年8月5日 週四 上午10:36寫道: > >> Hi! >> >> 单独设置 sink 并发是 1.13 的新特性。可以升级到 1.13 试试看。 >> >> Tony Wei 于2021年8月5日周四 上午10:21写道: >> >> > Hi Experts, >> > >> > 我嘗試使用 sql hints 去覆寫 hive sink table 的配置,發現 sql hints 沒有生效。 >> > 我測試了修改 `sink.parallelism` 來調整寫入的並行度,但並行度仍舊為 `1`。(sql 任務配置的並行度) >> > >> > 我運行的環境版本是 Flink 1.12.2,底下是我在 flink sql client 的操作流程和結果的截圖。 >> > 寫入的 `user_hive_2` 表是透過 hive beeline 創建的,不是透過 flink sql ddl。 >> > 同時我也確認了 `table.dynamic-table-options.enabled` 的配置是啟用的。 >> > >> > 請問是否是我在配置上或是使用上弄錯了什麼嗎?感謝解答。 >> > >> > CREATE CATALOG MyCatalog >> >> WITH ( >> >> 'type' = 'hive', >> >> 'hive-conf-dir' = '/etc/hive/conf', >> >> 'hadoop-conf-dir' = '/etc/hadoop/conf' >> >> ); >> >> >> > >> > >> > CREATE TABLE gen_users ( >> >> name STRING, >> >> age INT >> >> ) WITH ( >> >> 'connector' = 'datagen', >> >> 'rows-per-second' = '50' >> >> ); >> > >> > >> > >> > insert into `MyCatalog`.`default`.`user_hive_2` /*+ >> >> OPTIONS('sink.parallelism'='2') */ select * from gen_users; >> > >> > >> > [image: Screen Shot 2021-08-04 at 4.33.20 PM.png] >> > [image: Screen Shot 2021-08-04 at 4.33.32 PM.png] >> > [image: Screen Shot 2021-08-04 at 4.34.13 PM.png] >> > >> > >> >
Re: sql hints 在寫入透過 hive 創建的表時沒有作用
Hi, 感謝指正,我的目的是為了測試 sql hints 是否生效,選擇 `sink.parallelism` 是 單純因為這個配置比較好觀察結果。 我會再嘗試其他 hive streaming sink 的配置測試看看。謝謝。 Caizhi Weng 於 2021年8月5日 週四 上午10:36寫道: > Hi! > > 单独设置 sink 并发是 1.13 的新特性。可以升级到 1.13 试试看。 > > Tony Wei 于2021年8月5日周四 上午10:21写道: > > > Hi Experts, > > > > 我嘗試使用 sql hints 去覆寫 hive sink table 的配置,發現 sql hints 沒有生效。 > > 我測試了修改 `sink.parallelism` 來調整寫入的並行度,但並行度仍舊為 `1`。(sql 任務配置的並行度) > > > > 我運行的環境版本是 Flink 1.12.2,底下是我在 flink sql client 的操作流程和結果的截圖。 > > 寫入的 `user_hive_2` 表是透過 hive beeline 創建的,不是透過 flink sql ddl。 > > 同時我也確認了 `table.dynamic-table-options.enabled` 的配置是啟用的。 > > > > 請問是否是我在配置上或是使用上弄錯了什麼嗎?感謝解答。 > > > > CREATE CATALOG MyCatalog > >> WITH ( > >> 'type' = 'hive', > >> 'hive-conf-dir' = '/etc/hive/conf', > >> 'hadoop-conf-dir' = '/etc/hadoop/conf' > >> ); > >> > > > > > > CREATE TABLE gen_users ( > >> name STRING, > >> age INT > >> ) WITH ( > >> 'connector' = 'datagen', > >> 'rows-per-second' = '50' > >> ); > > > > > > > > insert into `MyCatalog`.`default`.`user_hive_2` /*+ > >> OPTIONS('sink.parallelism'='2') */ select * from gen_users; > > > > > > [image: Screen Shot 2021-08-04 at 4.33.20 PM.png] > > [image: Screen Shot 2021-08-04 at 4.33.32 PM.png] > > [image: Screen Shot 2021-08-04 at 4.34.13 PM.png] > > > > >
Re: sql hints 在寫入透過 hive 創建的表時沒有作用
Hi! 单独设置 sink 并发是 1.13 的新特性。可以升级到 1.13 试试看。 Tony Wei 于2021年8月5日周四 上午10:21写道: > Hi Experts, > > 我嘗試使用 sql hints 去覆寫 hive sink table 的配置,發現 sql hints 沒有生效。 > 我測試了修改 `sink.parallelism` 來調整寫入的並行度,但並行度仍舊為 `1`。(sql 任務配置的並行度) > > 我運行的環境版本是 Flink 1.12.2,底下是我在 flink sql client 的操作流程和結果的截圖。 > 寫入的 `user_hive_2` 表是透過 hive beeline 創建的,不是透過 flink sql ddl。 > 同時我也確認了 `table.dynamic-table-options.enabled` 的配置是啟用的。 > > 請問是否是我在配置上或是使用上弄錯了什麼嗎?感謝解答。 > > CREATE CATALOG MyCatalog >> WITH ( >> 'type' = 'hive', >> 'hive-conf-dir' = '/etc/hive/conf', >> 'hadoop-conf-dir' = '/etc/hadoop/conf' >> ); >> > > > CREATE TABLE gen_users ( >> name STRING, >> age INT >> ) WITH ( >> 'connector' = 'datagen', >> 'rows-per-second' = '50' >> ); > > > > insert into `MyCatalog`.`default`.`user_hive_2` /*+ >> OPTIONS('sink.parallelism'='2') */ select * from gen_users; > > > [image: Screen Shot 2021-08-04 at 4.33.20 PM.png] > [image: Screen Shot 2021-08-04 at 4.33.32 PM.png] > [image: Screen Shot 2021-08-04 at 4.34.13 PM.png] > >
sql hints 在寫入透過 hive 創建的表時沒有作用
Hi Experts, 我嘗試使用 sql hints 去覆寫 hive sink table 的配置,發現 sql hints 沒有生效。 我測試了修改 `sink.parallelism` 來調整寫入的並行度,但並行度仍舊為 `1`。(sql 任務配置的並行度) 我運行的環境版本是 Flink 1.12.2,底下是我在 flink sql client 的操作流程和結果的截圖。 寫入的 `user_hive_2` 表是透過 hive beeline 創建的,不是透過 flink sql ddl。 同時我也確認了 `table.dynamic-table-options.enabled` 的配置是啟用的。 請問是否是我在配置上或是使用上弄錯了什麼嗎?感謝解答。 CREATE CATALOG MyCatalog > WITH ( > 'type' = 'hive', > 'hive-conf-dir' = '/etc/hive/conf', > 'hadoop-conf-dir' = '/etc/hadoop/conf' > ); > CREATE TABLE gen_users ( > name STRING, > age INT > ) WITH ( > 'connector' = 'datagen', > 'rows-per-second' = '50' > ); insert into `MyCatalog`.`default`.`user_hive_2` /*+ > OPTIONS('sink.parallelism'='2') */ select * from gen_users; [image: Screen Shot 2021-08-04 at 4.33.20 PM.png] [image: Screen Shot 2021-08-04 at 4.33.32 PM.png] [image: Screen Shot 2021-08-04 at 4.34.13 PM.png]
Re: flink table over 窗口报错
好的 谢谢答疑 我这边尝试把时间字段设置成processTime的时候就好使了 抽空是看看源码分析下 在 2021年8月4日 21:01,Shengkai Fang 写道: 我发现换成流模式下,这个问题就解了。批的模式下,应该不需要定义watermark。这里看样子是有一个 bug 存在的,建议去社区建一个jira。 yanyunpeng 于2021年8月4日周三 下午5:42写道: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Ordering must be defined on a time attribute. > at > org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:111) > at > org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:69) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:249) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:160) > at java.util.Optional.orElseGet(Optional.java:267) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:160) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94) > at > org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37) > at > org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:143) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94) > at > org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37) > at > org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:88) > at > java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:269) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:89) > at > org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:235) > at java.util.function.Function.lambda$andThen$1(Function.java:88) > at > org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:198) > at > org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:194) > at > org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:183) > at > org.apache.flink.table.api.internal.TableImpl$OverWindowedTableImpl.select(TableImpl.java:956) > at com.rk.linkdata.flink.BatchFlinkTask.main(BatchFlinkTask.java:44) > > > > > Flink 版本1.13.0 > > > 在 2021年8月4日 17:37,Shengkai Fang 写道: > > > 能发一下具体的异常栈吗?是哪个版本? yanyunpeng 于2021年8月4日周三 > 下午2:47写道: > Table table = tableEnv > .from("t_yyp_test") > > .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > > .orderBy($("f_time")) > .preceding("unbounded_range") > > .following(CURRENT_RANGE) > .as("w")) > .select($("f_value"), > $("f_h"), > > $("f_l"), > $("f_j"), > $("f_value").avg().over($("w")), > > $("f_value").varPop().over($("w")), > > $("f_value").stddevPop().over($("w"))); > 也是一样的 > Exception in thread > "main" org.apache.flink.table.api.ValidationException: > Ordering must be > defined on a time attribute. > > 在 2021年8月4日 14:34,Caizhi Weng< > tsreape...@gmail.com> 写道: > > > Hi! order by 的字段是 f_time_bak,但是 watermark > 的字段是 f_time,这两个不一致。 yanyunpeng < > yanyunp...@rockontrol.com> > 于2021年8月4日周三 下午2:30写道: > 代码如下: > > EnvironmentSettings bbSettings = > > > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > > > TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > > > tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > " > > f_h STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value > > DOUBLE,\n" + > " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n, > > " + > " PRIMARY KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS > > f_time \n" + > ") WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' = > >
Re: flink table over 窗口报错
我发现换成流模式下,这个问题就解了。批的模式下,应该不需要定义watermark。这里看样子是有一个 bug 存在的,建议去社区建一个jira。 yanyunpeng 于2021年8月4日周三 下午5:42写道: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Ordering must be defined on a time attribute. > at > org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:111) > at > org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:69) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:249) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:160) > at java.util.Optional.orElseGet(Optional.java:267) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:160) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94) > at > org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37) > at > org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:143) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94) > at > org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37) > at > org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:88) > at > java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:269) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:89) > at > org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:235) > at java.util.function.Function.lambda$andThen$1(Function.java:88) > at > org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:198) > at > org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:194) > at > org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:183) > at > org.apache.flink.table.api.internal.TableImpl$OverWindowedTableImpl.select(TableImpl.java:956) > at com.rk.linkdata.flink.BatchFlinkTask.main(BatchFlinkTask.java:44) > > > > > Flink 版本1.13.0 > > > 在 2021年8月4日 17:37,Shengkai Fang 写道: > > > 能发一下具体的异常栈吗?是哪个版本? yanyunpeng 于2021年8月4日周三 > 下午2:47写道: > Table table = tableEnv > .from("t_yyp_test") > > .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > > .orderBy($("f_time")) > .preceding("unbounded_range") > > .following(CURRENT_RANGE) > .as("w")) > .select($("f_value"), > $("f_h"), > > $("f_l"), > $("f_j"), > $("f_value").avg().over($("w")), > > $("f_value").varPop().over($("w")), > > $("f_value").stddevPop().over($("w"))); > 也是一样的 > Exception in thread > "main" org.apache.flink.table.api.ValidationException: > Ordering must be > defined on a time attribute. > > 在 2021年8月4日 14:34,Caizhi Weng< > tsreape...@gmail.com> 写道: > > > Hi! order by 的字段是 f_time_bak,但是 watermark > 的字段是 f_time,这两个不一致。 yanyunpeng < > yanyunp...@rockontrol.com> > 于2021年8月4日周三 下午2:30写道: > 代码如下: > > EnvironmentSettings bbSettings = > > > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > > > TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > > > tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > " > > f_h STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value > > DOUBLE,\n" + > " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n, > > " + > " PRIMARY KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS > > f_time \n" + > ") WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' = > > 'jdbc:mysql://***',\n" + > " 'table-name' = '123',\n" + > " 'username' = > > '123',\n" + > " 'password' = '123'\n" + > ")"); > > >
(无主题)
退订
Re: flink table over 窗口报错
Exception in thread "main" org.apache.flink.table.api.ValidationException: Ordering must be defined on a time attribute. at org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:111) at org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:69) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:249) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:160) at java.util.Optional.orElseGet(Optional.java:267) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:160) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37) at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:143) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37) at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:88) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:269) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:89) at org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:235) at java.util.function.Function.lambda$andThen$1(Function.java:88) at org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:198) at org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:194) at org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:183) at org.apache.flink.table.api.internal.TableImpl$OverWindowedTableImpl.select(TableImpl.java:956) at com.rk.linkdata.flink.BatchFlinkTask.main(BatchFlinkTask.java:44) Flink 版本1.13.0 在 2021年8月4日 17:37,Shengkai Fang 写道: 能发一下具体的异常栈吗?是哪个版本? yanyunpeng 于2021年8月4日周三 下午2:47写道: > Table table = tableEnv > .from("t_yyp_test") > .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > .orderBy($("f_time")) > .preceding("unbounded_range") > .following(CURRENT_RANGE) > .as("w")) > .select($("f_value"), > $("f_h"), > $("f_l"), > $("f_j"), > $("f_value").avg().over($("w")), > $("f_value").varPop().over($("w")), > $("f_value").stddevPop().over($("w"))); > 也是一样的 > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Ordering must be defined on a time attribute. > > 在 2021年8月4日 14:34,Caizhi Weng 写道: > > > Hi! order by 的字段是 f_time_bak,但是 watermark 的字段是 f_time,这两个不一致。 yanyunpeng < > yanyunp...@rockontrol.com> 于2021年8月4日周三 下午2:30写道: > 代码如下: > > EnvironmentSettings bbSettings = > > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > > TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > > tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > " > f_h STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value > DOUBLE,\n" + > " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n, > " + > " PRIMARY KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS > f_time \n" + > ") WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' = > 'jdbc:mysql://***',\n" + > " 'table-name' = '123',\n" + > " 'username' = > '123',\n" + > " 'password' = '123'\n" + > ")"); > > tableEnv.registerFunction("GaussianFunction", new GaussianFunction()); > > Table table = tableEnv > .from("t_yyp_test") > > .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > > .orderBy($("f_time_bak")) > .preceding("unbounded_range") > > .following(CURRENT_RANGE) > .as("w")) >
filesystem connector不支持跨subtask合并小文件
你好, 在使用filesystem connector过程中,开启了compaction,使用parquet列式文件,指定3个并行度,但发现无论如何也触发不了合并,因为列式文件是checkpoint触发时才会滚动,这样同一checkpoint内会产生与并行度相同的文件,按说此时文件数已经大于1了,为什么不合并呢? create table fs_parquet_compact (userid bigint, name string, part string) PARTITIONED BY (part) with( 'connector' = 'filesystem', 'path' = 'hdfs:///data/fs_parquet_compact', 'format' = 'parquet', 'auto-compaction' = 'true', 'compaction.file-size' = '2kb', 'sink.rolling-policy.file-size' = '500b', 'sink.rolling-policy.rollover-interval' = '800s', 'sink.rolling-policy.check-interval' = '60s' ); lixin58...@163.com
Re: flink table over 窗口报错
能发一下具体的异常栈吗?是哪个版本? yanyunpeng 于2021年8月4日周三 下午2:47写道: > Table table = tableEnv > .from("t_yyp_test") > .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > .orderBy($("f_time")) > .preceding("unbounded_range") > .following(CURRENT_RANGE) > .as("w")) > .select($("f_value"), > $("f_h"), > $("f_l"), > $("f_j"), > $("f_value").avg().over($("w")), > $("f_value").varPop().over($("w")), > $("f_value").stddevPop().over($("w"))); > 也是一样的 > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Ordering must be defined on a time attribute. > > 在 2021年8月4日 14:34,Caizhi Weng 写道: > > > Hi! order by 的字段是 f_time_bak,但是 watermark 的字段是 f_time,这两个不一致。 yanyunpeng < > yanyunp...@rockontrol.com> 于2021年8月4日周三 下午2:30写道: > 代码如下: > > EnvironmentSettings bbSettings = > > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > > TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > > tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > " > f_h STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value > DOUBLE,\n" + > " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n, > " + > " PRIMARY KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS > f_time \n" + > ") WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' = > 'jdbc:mysql://***',\n" + > " 'table-name' = '123',\n" + > " 'username' = > '123',\n" + > " 'password' = '123'\n" + > ")"); > > tableEnv.registerFunction("GaussianFunction", new GaussianFunction()); > > Table table = tableEnv > .from("t_yyp_test") > > .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > > .orderBy($("f_time_bak")) > .preceding("unbounded_range") > > .following(CURRENT_RANGE) > .as("w")) > .select($("f_h"), > $("f_l"), > > $("f_j"), > $("f_value").avg().over($("w")), > > $("f_value").varPop().over($("w")), > > $("f_value").stddevPop().over($("w"))); > > > 已经定义了eventTime > 使用eventTIme或者别的时间字段排序都报错 > > > Exception in thread "main" > org.apache.flink.table.api.ValidationException: > Ordering must be defined > on a time attribute. > > > 请问这是什么原因
Re: Flink sql 维表聚合问题请教
额...,说的太对了, batch任务没问题,但流任务就发生意想不到的问题. 该需求就是翻译原离线SQL(传统数仓), 现要改成实时分析. 结果发现有些需求好像实现不了 非常感谢! > 在 2021年8月4日,16:50,黑色 写道: > > 你这是维表lookup,上流来数据来了,根据on后面的key,是当前去查快照返回结果,不可能是聚合之后的 > 当然你要是batch来了,没问题 > > > > > --原始邮件-- > 发件人: > "user-zh" > > 发送时间:2021年8月4日(星期三) 下午4:44 > 收件人:"user-zh" > 主题:Re: Flink sql 维表聚合问题请教 > > > > Hi! > > 我查了一下,processing time temporal join 确实还没有实现... 这里可能需要变成 event time temporal > join[1] 或者双流 join 了。但更好的方法可能是维表本身就已经计算好所需的数据。 > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#event-time-temporal-join > > carlc > 感谢大佬回复,我尝试着换种写法,但这样些的话会直接报错。 > > create view v_bl_user_count as ( > select user_id, count(1) > from mysql_user_blacklist > group by user_id > ); > > select t1.`user_id` > , t1.`event_type` > , t1.`current_ts` > from kafka_user_event t1 > left join v_bl_user_count FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2 on > t1.`user_id` = t2.`user_id` > where t1.`event_type` = ‘LOGIN’ > > 异常信息: > org.apache.flink.table.api.TableException: Processing-time temporal join > is not supported yet. > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56) > at > > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) > > > > > 在 2021年8月4日,14:18,Caizhi Weng > Hi! > > 这是因为每次维表 join 都会向下游发送两条数据,一共发送了四条,所以最后 count 的结果为 4,是符合预期的。 > > 为什么不直接对维表做 agg 呢?当然对维表做 agg 的话,这里就不是 lookup join 而是 process time > temporal > table join 了。 > > carlc > 请教下如何在维表上做聚合操作? 如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~ > > -- 模拟需求(有点牵强...): > -- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表 > mysql_user_blacklist > 统计对应 user_id 在维表中的次数 - 即: 在维表上做聚合操作 > > -- 1. 创建user_blacklist表 > CREATE TABLE `user_blacklist` ( > `user_id` bigint(20) NOT NULL, > `create_time` datetime NOT NULL, > PRIMARY KEY (`user_id`,`create_time`) > ) ENGINE=InnoDB DEFAULT CHARSET=utf8; > INSERT INTO user_blacklist (`user_id`, `create_time`) > VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'), > (2,'2021-01-04 00:00:00'); > > -- 2. 模拟kafka数据: > -- 第1条: > {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01 > 00:00:00"} > -- 第2条: > {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02 > 00:00:00"} > > -- 操作步骤: > 当发送第1条kafka数据得到如下输出: > | OP| user_id| event_type | current_ts| bl_count | > | +I | 1 | LOGIN | 2021-10-01T00:00 | 1 | > | +I | 1 | LOGIN | 2021-10-01T00:00 | 2 | > 当再次发送第1条kafka数据得到如下输出: > | +I | 1 | LOGIN | 2021-10-01T00:00 | 3 | > | +I | 1 | LOGIN | 2021-10-01T00:00 | 4 | > > — SQL 如下: > > create table kafka_user_event > ( > `user_id` BIGINT, > `event_type` STRING, > `current_ts` timestamp(3), > `proc_time` AS PROCTIME() > ) WITH ( > 'connector' = 'kafka', > ... > ); > > create table mysql_user_blacklist > ( > user_id BIGINT, > create_time timestamp(3), > primary key (user_id,create_time) not enforced > ) WITH ( > 'connector' = 'jdbc', > … > ); > > create view v2_user_event as ( > select t1.`user_id` > , t1.`event_type` > , t1.`current_ts` > , count(1) over ( partition by t2.`user_id` order by > t1.`proc_time` ROWS > BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count > from kafka_user_event t1 > left join mysql_user_blacklist FOR SYSTEM_TIME AS OF > t1.`proc_time` AS > t2 > on t1.`user_id` = t2.`user_id` > where t1.`event_type` = 'LOGIN' > ); > > select * from v2_user_event; > > > >
Re: 1.14啥时候出呀
1.14还有1-2个月 1.13.2马上就出了,估计明天或后天或周一 On Wed, Aug 4, 2021 at 4:48 PM yidan zhao wrote: > 如题,1.14或1.13.2啥时候出呀,有人知道吗。 > -- Best, Jingsong Lee
?????? Flink sql ????????????????
??lookup??on??key,?? ??batch ---- ??: "user-zh" https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#event-time-temporal-join carlc
1.14啥时候出呀
如题,1.14或1.13.2啥时候出呀,有人知道吗。
Re: Flink sql 维表聚合问题请教
Hi! 我查了一下,processing time temporal join 确实还没有实现... 这里可能需要变成 event time temporal join[1] 或者双流 join 了。但更好的方法可能是维表本身就已经计算好所需的数据。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#event-time-temporal-join carlc 于2021年8月4日周三 下午3:57写道: > 感谢大佬回复,我尝试着换种写法,但这样些的话会直接报错。 > > create view v_bl_user_count as ( > select user_id, count(1) > from mysql_user_blacklist > group by user_id > ); > > select t1.`user_id` > , t1.`event_type` > , t1.`current_ts` > from kafka_user_event t1 > left join v_bl_user_count FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2 on > t1.`user_id` = t2.`user_id` > where t1.`event_type` = ‘LOGIN’ > > 异常信息: > org.apache.flink.table.api.TableException: Processing-time temporal join > is not supported yet. > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) > > > > > > 在 2021年8月4日,14:18,Caizhi Weng 写道: > > > > Hi! > > > > 这是因为每次维表 join 都会向下游发送两条数据,一共发送了四条,所以最后 count 的结果为 4,是符合预期的。 > > > > 为什么不直接对维表做 agg 呢?当然对维表做 agg 的话,这里就不是 lookup join 而是 process time temporal > > table join 了。 > > > > carlc 于2021年8月4日周三 上午10:41写道: > > > >> 请教下如何在维表上做聚合操作? 如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~ > >> > >> -- 模拟需求(有点牵强...): > >> -- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表 > mysql_user_blacklist > >> 统计对应 user_id 在维表中的次数 -> 即: 在维表上做聚合操作 > >> > >> -- 1. 创建user_blacklist表 > >> CREATE TABLE `user_blacklist` ( > >> `user_id` bigint(20) NOT NULL, > >> `create_time` datetime NOT NULL, > >> PRIMARY KEY (`user_id`,`create_time`) > >> ) ENGINE=InnoDB DEFAULT CHARSET=utf8; > >> INSERT INTO user_blacklist (`user_id`, `create_time`) > >> VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'), > >> (2,'2021-01-04 00:00:00'); > >> > >> -- 2. 模拟kafka数据: > >> -- 第1条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01 > >> 00:00:00"} > >> -- 第2条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02 > >> 00:00:00"} > >> > >> -- 操作步骤: > >> 当发送第1条kafka数据得到如下输出: > >> | OP| user_id| event_type | current_ts| bl_count | > >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 1 | > >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 2 | > >> 当再次发送第1条kafka数据得到如下输出: > >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 3 | > >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 4 | > >> > >> — SQL 如下: > >> > >> create table kafka_user_event > >> ( > >> `user_id` BIGINT, > >> `event_type` STRING, > >> `current_ts` timestamp(3), > >> `proc_time` AS PROCTIME() > >> ) WITH ( > >> 'connector' = 'kafka', > >> ... > >> ); > >> > >> create table mysql_user_blacklist > >> ( > >> user_id BIGINT, > >> create_time timestamp(3), > >> primary key (user_id,create_time) not enforced > >> ) WITH ( > >> 'connector' = 'jdbc', > >> … > >> ); > >> > >> create view v2_user_event as ( > >> select t1.`user_id` > >> , t1.`event_type` > >> , t1.`current_ts` > >> , count(1) over ( partition by t2.`user_id` order by t1.`proc_time` ROWS > >> BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count > >> from kafka_user_event t1 > >> left join mysql_user_blacklist FOR SYSTEM_TIME AS OF t1.`proc_time` AS > t2 > >> on t1.`user_id` = t2.`user_id` > >> where t1.`event_type` = 'LOGIN' > >> ); > >> > >> select * from v2_user_event; > >> > >> > >
Re: Flink sql 维表聚合问题请教
感谢大佬回复,我尝试着换种写法,但这样些的话会直接报错。 create view v_bl_user_count as ( select user_id, count(1) from mysql_user_blacklist group by user_id ); select t1.`user_id` , t1.`event_type` , t1.`current_ts` from kafka_user_event t1 left join v_bl_user_count FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2 on t1.`user_id` = t2.`user_id` where t1.`event_type` = ‘LOGIN’ 异常信息: org.apache.flink.table.api.TableException: Processing-time temporal join is not supported yet. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) > 在 2021年8月4日,14:18,Caizhi Weng 写道: > > Hi! > > 这是因为每次维表 join 都会向下游发送两条数据,一共发送了四条,所以最后 count 的结果为 4,是符合预期的。 > > 为什么不直接对维表做 agg 呢?当然对维表做 agg 的话,这里就不是 lookup join 而是 process time temporal > table join 了。 > > carlc 于2021年8月4日周三 上午10:41写道: > >> 请教下如何在维表上做聚合操作? 如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~ >> >> -- 模拟需求(有点牵强...): >> -- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表 mysql_user_blacklist >> 统计对应 user_id 在维表中的次数 -> 即: 在维表上做聚合操作 >> >> -- 1. 创建user_blacklist表 >> CREATE TABLE `user_blacklist` ( >> `user_id` bigint(20) NOT NULL, >> `create_time` datetime NOT NULL, >> PRIMARY KEY (`user_id`,`create_time`) >> ) ENGINE=InnoDB DEFAULT CHARSET=utf8; >> INSERT INTO user_blacklist (`user_id`, `create_time`) >> VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'), >> (2,'2021-01-04 00:00:00'); >> >> -- 2. 模拟kafka数据: >> -- 第1条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01 >> 00:00:00"} >> -- 第2条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02 >> 00:00:00"} >> >> -- 操作步骤: >> 当发送第1条kafka数据得到如下输出: >> | OP| user_id| event_type | current_ts| bl_count | >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 1 | >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 2 | >> 当再次发送第1条kafka数据得到如下输出: >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 3 | >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 4 | >> >> — SQL 如下: >> >> create table kafka_user_event >> ( >> `user_id` BIGINT, >> `event_type` STRING, >> `current_ts` timestamp(3), >> `proc_time` AS PROCTIME() >> ) WITH ( >> 'connector' = 'kafka', >> ... >> ); >> >> create table mysql_user_blacklist >> ( >> user_id BIGINT, >> create_time timestamp(3), >> primary key (user_id,create_time) not enforced >> ) WITH ( >> 'connector' = 'jdbc', >> … >> ); >> >> create view v2_user_event as ( >> select t1.`user_id` >> , t1.`event_type` >> , t1.`current_ts` >> , count(1) over ( partition by t2.`user_id` order by t1.`proc_time` ROWS >> BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count >> from kafka_user_event t1 >> left join mysql_user_blacklist FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2 >> on t1.`user_id` = t2.`user_id` >> where t1.`event_type` = 'LOGIN' >> ); >> >> select * from v2_user_event; >> >>
Re: 几个Flink 1.12. 2超时问题
目前是在所有taskmanager容器都成功启动之后,才出现的timeout,所以不可能是调度层面的问题。 目前我们在网络层面使用的是生产环境的网络,该环境被用于跑大量的生产流量,也没有其他容器反馈过类似问题。 我目前还是比较怀疑flink本身的某个配置导致了这个现象,请问flink是否有相关的metrics或日志可以参考? On 2021/8/4, 11:50 AM, "东东" wrote: 应该可以从两个层面查一下: 1、调度层面。native application是先启动JM容器,然后由JM容器与K8s交互拉起TM的,可以看一下K8s日志,看看整个流程是否有瓶颈点,比如镜像的拉取,TM容器的启动之类。 2、网络层面。如果调度没有问题,各容器启动的过程和速度都很正常,那就要看网络层面是否存在瓶颈,必要的时候可以tcpdump一下。 在 2021-08-03 14:02:53,"Chenyu Zheng" 写道: 开发者您好, 我正在尝试在Kubernetes上部署Flink 1.12.2,使用的是native application部署模式。但是在测试中发现,当将作业并行度调大之后,各种timeout时有发生。根据监控看,JM和TM容器的cpu和内存都没有使用到k8s给分配的量。 在尝试调大akka.ask.timeout至1分钟,和heartbeat.timeout至2分钟之后,各种超时现象得以缓解。 我的问题是,当设置较大并行度(比如128)时,akka超时和心跳超时的各种现象都是正常的吗?如果不正常,需要用什么方式去troubleshot问题的根源呢?另外单纯一味调大各个组件的超时时间,会带来什么负面作用呢? 附件中有akka超时的jobmanager日志,TaskManager心跳超时日志稍后会发上来。 谢谢!
Re: flink 1.13.1 使用hive方言,执行hive sql解析报错
用1.1.0试了一下也没复现,你insert语句中的中文如果换成英文试试看解析能不能过呢 On Mon, Aug 2, 2021 at 3:05 PM Asahi Lee <978466...@qq.com.invalid> wrote: > hive 1.1.0版本 > > > > > --原始邮件-- > 发件人: > "user-zh" > < > lirui.fu...@gmail.com; > 发送时间:2021年8月2日(星期一) 中午12:23 > 收件人:"user-zh" > 主题:Re: flink 1.13.1 使用hive方言,执行hive sql解析报错 > > > > 我本地试了一下没有复现你的问题,你的hive版本是什么呢? > > On Fri, Jul 30, 2021 at 3:00 PM Asahi Lee <978466...@qq.com.invalid > wrote: > > CREATE TABLE `cosldatacenter.ods_emp_maindata_iadc_paramvalue`( > nbsp; `paramvalue_id` string COMMENT '',nbsp; > nbsp; `platform_id` string COMMENT '',nbsp; > nbsp; `equipment_id` string COMMENT '',nbsp; > nbsp; `param_id` string COMMENT '',nbsp; > nbsp; `param_value` string COMMENT '',nbsp; > nbsp; `remark` string COMMENT '',nbsp; > nbsp; `create_time` string COMMENT '',nbsp; > nbsp; `creator` string COMMENT '',nbsp; > nbsp; `update_time` string COMMENT '',nbsp; > nbsp; `update_person` string COMMENT '',nbsp; > nbsp; `record_flag` double COMMENT '',nbsp; > nbsp; `subject_id` string COMMENT '',nbsp; > nbsp; `output_unit` string COMMENT '',nbsp; > nbsp; `show_seq` double COMMENT '') > COMMENT '' > ROW FORMAT SERDEnbsp; > nbsp; 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'nbsp; > WITH SERDEPROPERTIES (nbsp; > nbsp; 'field.delim'=',',nbsp; > nbsp; 'serialization.format'=',')nbsp; > STORED AS INPUTFORMATnbsp; > nbsp; 'org.apache.hadoop.hive.ql.io > .orc.OrcInputFormat'nbsp; > OUTPUTFORMATnbsp; > nbsp; 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' > LOCATION > nbsp; > > 'hdfs://hadoop02:8020/user/hive/warehouse/cosldatacenter.db/ods_emp_maindata_iadc_paramvalue' > TBLPROPERTIES ( > nbsp; 'COLUMN_STATS_ACCURATE'='false',nbsp; > nbsp; 'last_modified_by'='root',nbsp; > nbsp; 'last_modified_time'='1621834335',nbsp; > nbsp; 'numFiles'='0',nbsp; > nbsp; 'numRows'='-1',nbsp; > nbsp; 'rawDataSize'='-1',nbsp; > nbsp; 'totalSize'='0',nbsp; > nbsp; 'transient_lastDdlTime'='1621834335') > > > > CREATE TABLE `cosldatacenter.ods_emp_md_large_equip`( > nbsp; `large_equip_id` string COMMENT '',nbsp; > nbsp; `equip_name` string COMMENT '',nbsp; > nbsp; `equip_type` string COMMENT '',nbsp; > nbsp; `equip_function` string COMMENT '',nbsp; > nbsp; `equip_board` string COMMENT '',nbsp; > nbsp; `ship_yard` string COMMENT '',nbsp; > nbsp; `manufacturer_date` string COMMENT '',nbsp; > nbsp; `enqueue_date` string COMMENT '',nbsp; > nbsp; `dockrepair_date` string COMMENT '',nbsp; > nbsp; `scrap_date` string COMMENT '',nbsp; > nbsp; `enqueue_mode` string COMMENT '',nbsp; > nbsp; `work_for_org` string COMMENT '',nbsp; > nbsp; `work_in_org` string COMMENT '',nbsp; > nbsp; `old_age` string COMMENT '',nbsp; > nbsp; `create_time` date COMMENT '',nbsp; > nbsp; `creator` string COMMENT '',nbsp; > nbsp; `update_time` date COMMENT '',nbsp; > nbsp; `update_person` string COMMENT '',nbsp; > nbsp; `record_flag` double COMMENT '',nbsp; > nbsp; `data_timestamp` string COMMENT '',nbsp; > nbsp; `work_unit_id` string COMMENT '',nbsp; > nbsp; `work_status` string COMMENT '',nbsp; > nbsp; `work_location` string COMMENT '',nbsp; > nbsp; `work_area` string COMMENT '',nbsp; > nbsp; `equip_code` string COMMENT '',nbsp; > nbsp; `shi_main_power` double COMMENT '',nbsp; > nbsp; `shi_total_len` double COMMENT '',nbsp; > nbsp; `shi_type_width` double COMMENT '',nbsp; > nbsp; `shi_type_depth` double COMMENT '',nbsp; > nbsp; `shi_design_draft` double COMMENT '',nbsp; > nbsp; `shi_total_tonnage` double COMMENT '',nbsp; > nbsp; `shi_load_tonnage` double COMMENT '',nbsp; > nbsp; `remark` string COMMENT '',nbsp; > nbsp; `unit_classification1` string COMMENT '',nbsp; > nbsp; `unit_classification2` string COMMENT '') > COMMENT '' > ROW FORMAT SERDEnbsp; > nbsp; 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'nbsp; > WITH SERDEPROPERTIES (nbsp; > nbsp; 'field.delim'=',',nbsp; > nbsp; 'serialization.format'=',')nbsp; > STORED AS INPUTFORMATnbsp; > nbsp; 'org.apache.hadoop.hive.ql.io > .orc.OrcInputFormat'nbsp; > OUTPUTFORMATnbsp; > nbsp; 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' > LOCATION > nbsp; > > 'hdfs://hadoop02:8020/user/hive/warehouse/cosldatacenter.db/ods_emp_md_large_equip' > TBLPROPERTIES ( > nbsp; 'COLUMN_STATS_ACCURATE'='false',nbsp; > nbsp; 'last_modified_by'='root',nbsp; > nbsp; 'last_modified_time'='1621834338',nbsp; > nbsp; 'numFiles'='0',nbsp; > nbsp; 'numRows'='-1',nbsp; > nbsp; 'rawDataSize'='-1',nbsp; > nbsp; 'totalSize'='0',nbsp; > nbsp; 'transient_lastDdlTime'='1621834338') > > > > CREATE TABLE `cosldatacenter.ods_emp_maindata_iadc_paramdef`( > nbsp; `param_id` string COMMENT '',nbsp; > nbsp; `iadc_id` string COMMENT '',nbsp; > nbsp; `param_code` string COMMENT '',nbsp; > nbsp; `param_en` string COMMENT '',nbsp; > nbsp; `param_cn` string COMMENT '',nbsp; > nbsp; `output_standard`
Re: flink table over 窗口报错
Table table = tableEnv .from("t_yyp_test") .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) .orderBy($("f_time")) .preceding("unbounded_range") .following(CURRENT_RANGE) .as("w")) .select($("f_value"), $("f_h"), $("f_l"), $("f_j"), $("f_value").avg().over($("w")), $("f_value").varPop().over($("w")), $("f_value").stddevPop().over($("w"))); 也是一样的 Exception in thread "main" org.apache.flink.table.api.ValidationException: Ordering must be defined on a time attribute. 在 2021年8月4日 14:34,Caizhi Weng 写道: Hi! order by 的字段是 f_time_bak,但是 watermark 的字段是 f_time,这两个不一致。 yanyunpeng 于2021年8月4日周三 下午2:30写道: > 代码如下: > EnvironmentSettings bbSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > " f_h STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value DOUBLE,\n" + > " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n, " + > " PRIMARY KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS f_time \n" + > ") WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' = 'jdbc:mysql://***',\n" + > " 'table-name' = '123',\n" + > " 'username' = '123',\n" + > " 'password' = '123'\n" + > ")"); > tableEnv.registerFunction("GaussianFunction", new GaussianFunction()); > Table table = tableEnv > .from("t_yyp_test") > .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > .orderBy($("f_time_bak")) > .preceding("unbounded_range") > .following(CURRENT_RANGE) > .as("w")) > .select($("f_h"), > $("f_l"), > $("f_j"), > $("f_value").avg().over($("w")), > $("f_value").varPop().over($("w")), > $("f_value").stddevPop().over($("w"))); > > > 已经定义了eventTime 使用eventTIme或者别的时间字段排序都报错 > > > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Ordering must be defined on a time attribute. > > > 请问这是什么原因
Re: flink table over 窗口报错
Hi! order by 的字段是 f_time_bak,但是 watermark 的字段是 f_time,这两个不一致。 yanyunpeng 于2021年8月4日周三 下午2:30写道: > 代码如下: > EnvironmentSettings bbSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > " f_h STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value DOUBLE,\n" + > " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n, " + > " PRIMARY KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS f_time \n" + > ") WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' = 'jdbc:mysql://***',\n" + > " 'table-name' = '123',\n" + > " 'username' = '123',\n" + > " 'password' = '123'\n" + > ")"); > tableEnv.registerFunction("GaussianFunction", new GaussianFunction()); > Table table = tableEnv > .from("t_yyp_test") > .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > .orderBy($("f_time_bak")) > .preceding("unbounded_range") > .following(CURRENT_RANGE) > .as("w")) > .select($("f_h"), > $("f_l"), > $("f_j"), > $("f_value").avg().over($("w")), > $("f_value").varPop().over($("w")), > $("f_value").stddevPop().over($("w"))); > > > 已经定义了eventTime 使用eventTIme或者别的时间字段排序都报错 > > > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Ordering must be defined on a time attribute. > > > 请问这是什么原因
flink table over 窗口报错
代码如下: EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironment.create(bbSettings); tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + " f_id INT,\n" + " f_h STRING,\n" + " f_l STRING,\n" + " f_j STRING,\n" + " f_value DOUBLE,\n" + " f_time TIMESTAMP(3)\n, " + " f_time_bak TIMESTAMP(3)\n, " + " PRIMARY KEY (f_id) NOT ENFORCED,\n" + " WATERMARK FOR f_time AS f_time \n" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://***',\n" + " 'table-name' = '123',\n" + " 'username' = '123',\n" + " 'password' = '123'\n" + ")"); tableEnv.registerFunction("GaussianFunction", new GaussianFunction()); Table table = tableEnv .from("t_yyp_test") .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) .orderBy($("f_time_bak")) .preceding("unbounded_range") .following(CURRENT_RANGE) .as("w")) .select($("f_h"), $("f_l"), $("f_j"), $("f_value").avg().over($("w")), $("f_value").varPop().over($("w")), $("f_value").stddevPop().over($("w"))); 已经定义了eventTime 使用eventTIme或者别的时间字段排序都报错 Exception in thread "main" org.apache.flink.table.api.ValidationException: Ordering must be defined on a time attribute. 请问这是什么原因
Re: Flink sql 维表聚合问题请教
Hi! 这是因为每次维表 join 都会向下游发送两条数据,一共发送了四条,所以最后 count 的结果为 4,是符合预期的。 为什么不直接对维表做 agg 呢?当然对维表做 agg 的话,这里就不是 lookup join 而是 process time temporal table join 了。 carlc 于2021年8月4日周三 上午10:41写道: > 请教下如何在维表上做聚合操作? 如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~ > > -- 模拟需求(有点牵强...): > -- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表 mysql_user_blacklist > 统计对应 user_id 在维表中的次数 -> 即: 在维表上做聚合操作 > > -- 1. 创建user_blacklist表 > CREATE TABLE `user_blacklist` ( > `user_id` bigint(20) NOT NULL, > `create_time` datetime NOT NULL, > PRIMARY KEY (`user_id`,`create_time`) > ) ENGINE=InnoDB DEFAULT CHARSET=utf8; > INSERT INTO user_blacklist (`user_id`, `create_time`) > VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'), > (2,'2021-01-04 00:00:00'); > > -- 2. 模拟kafka数据: > -- 第1条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01 > 00:00:00"} > -- 第2条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02 > 00:00:00"} > > -- 操作步骤: > 当发送第1条kafka数据得到如下输出: > | OP| user_id| event_type | current_ts| bl_count | > | +I | 1 | LOGIN | 2021-10-01T00:00 | 1 | > | +I | 1 | LOGIN | 2021-10-01T00:00 | 2 | > 当再次发送第1条kafka数据得到如下输出: > | +I | 1 | LOGIN | 2021-10-01T00:00 | 3 | > | +I | 1 | LOGIN | 2021-10-01T00:00 | 4 | > > — SQL 如下: > > create table kafka_user_event > ( > `user_id` BIGINT, > `event_type` STRING, > `current_ts` timestamp(3), > `proc_time` AS PROCTIME() > ) WITH ( > 'connector' = 'kafka', > ... > ); > > create table mysql_user_blacklist > ( > user_id BIGINT, > create_time timestamp(3), > primary key (user_id,create_time) not enforced > ) WITH ( > 'connector' = 'jdbc', > … > ); > > create view v2_user_event as ( > select t1.`user_id` > , t1.`event_type` > , t1.`current_ts` > , count(1) over ( partition by t2.`user_id` order by t1.`proc_time` ROWS > BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count > from kafka_user_event t1 > left join mysql_user_blacklist FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2 > on t1.`user_id` = t2.`user_id` > where t1.`event_type` = 'LOGIN' > ); > > select * from v2_user_event; > >