Re: Re: filesystem connector不支持跨subtask合并小文件

2021-08-04 文章 lixin58...@163.com
你好,
生成的三个文件挺小的,不到2kb,1k多一点,配这个是为了合并后比2k大



lixin58...@163.com
 
发件人: Rui Li
发送时间: 2021-08-05 11:42
收件人: user-zh
主题: Re: filesystem connector不支持跨subtask合并小文件
你好,
 
看到你的compaction.file-size配置成了2kb,这个是希望合并以后的文件的target size只有2kb么
 
On Wed, Aug 4, 2021 at 5:39 PM lixin58...@163.com 
wrote:
 
> 你好,
> 在使用filesystem
> connector过程中,开启了compaction,使用parquet列式文件,指定3个并行度,但发现无论如何也触发不了合并,因为列式文件是checkpoint触发时才会滚动,这样同一checkpoint内会产生与并行度相同的文件,按说此时文件数已经大于1了,为什么不合并呢?
>
> create table fs_parquet_compact
> (userid bigint, name string, part string)
> PARTITIONED BY (part)
> with(
> 'connector' = 'filesystem',
> 'path' = 'hdfs:///data/fs_parquet_compact',
> 'format' = 'parquet',
> 'auto-compaction' = 'true',
> 'compaction.file-size' = '2kb',
> 'sink.rolling-policy.file-size' = '500b',
> 'sink.rolling-policy.rollover-interval' = '800s',
> 'sink.rolling-policy.check-interval' = '60s'
> );
>
>
>
> lixin58...@163.com
>
 
 
-- 
Best regards!
Rui Li


?????? flink 1.13.1 ????hive??????????hive sql????????

2021-08-04 文章 Asahi Lee
SqlParser.parseStmtlist()sqlSqlNode 
toString()??unicode




----
??: 
   "user-zh"



flink sql sink到hbase报错hbase版本问题

2021-08-04 文章 tomhan_th
请教各位大佬一个问题,
使用flink sql sink数据到hbase  (flink版本 1.13.1  hbase版本2.2.6)
提交任务后,一直报错误是 java.lang.RuntimeException: hbase-default.xml file seems to be for 
an older version of HBase (2.2.3), this version is 2.2.6


已经在连接器参数里面配置了 'properties.hbase.defaults.for.version.skip'='true',
hbase-default.xml也配置了跳过版本检查还是不行。。。


请问下,这个2.2.3这个数字是怎么来的呢? 该如何解决这个问题呢?
提前感谢各位大佬




部分异常信息如下:


Table options are:


'connector'='hbase-2.2'
'properties.hbase.defaults.for.version.skip'='true'
'table-name'=''
'zookeeper.quorum'='xxx:xxx'
at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:171)
at 
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:373)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:201)
at 
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:162)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
at net.matomo.impl.FlinkCDCVisitorChannel.(FlinkCDCVisitorChannel.java:73)
at net.matomo.impl.FlinkCDCVisitorChannel.main(FlinkCDCVisitorChannel.java:86)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 8 more
Caused by: java.lang.RuntimeException: hbase-default.xml file seems to be for 
an older version of HBase (2.2.3), this version is 2.2.6
at 
org.apache.hadoop.hbase.HBaseConfiguration.checkDefaultsVersion(HBaseConfiguration.java:74)
at 
org.apache.hadoop.hbase.HBaseConfiguration.addHbaseResources(HBaseConfiguration.java:84)
at org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:98)
at 
org.apache.flink.connector.hbase.util.HBaseConfigurationUtil.getHBaseConfiguration(HBaseConfigurationUtil.java:49)
at 
org.apache.flink.connector.hbase.options.HBaseOptions.getHBaseConfiguration(HBaseOptions.java:192)
at 
org.apache.flink.connector.hbase2.HBase2DynamicTableFactory.createDynamicTableSink(HBase2DynamicTableFactory.java:101)
at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:168)
... 33 more

Re: filesystem connector不支持跨subtask合并小文件

2021-08-04 文章 Rui Li
你好,

看到你的compaction.file-size配置成了2kb,这个是希望合并以后的文件的target size只有2kb么

On Wed, Aug 4, 2021 at 5:39 PM lixin58...@163.com 
wrote:

> 你好,
> 在使用filesystem
> connector过程中,开启了compaction,使用parquet列式文件,指定3个并行度,但发现无论如何也触发不了合并,因为列式文件是checkpoint触发时才会滚动,这样同一checkpoint内会产生与并行度相同的文件,按说此时文件数已经大于1了,为什么不合并呢?
>
> create table fs_parquet_compact
> (userid bigint, name string, part string)
> PARTITIONED BY (part)
> with(
> 'connector' = 'filesystem',
> 'path' = 'hdfs:///data/fs_parquet_compact',
> 'format' = 'parquet',
> 'auto-compaction' = 'true',
> 'compaction.file-size' = '2kb',
> 'sink.rolling-policy.file-size' = '500b',
> 'sink.rolling-policy.rollover-interval' = '800s',
> 'sink.rolling-policy.check-interval' = '60s'
> );
>
>
>
> lixin58...@163.com
>


-- 
Best regards!
Rui Li


Re: sql hints 在寫入透過 hive 創建的表時沒有作用

2021-08-04 文章 Tony Wei
Hi Caizhi,

我測試了 sink.rolling-policy.rollover-interval 這個配置,並且改使用 csv hive table 作為
sink table,結果是符合預期的。再次謝謝你的幫忙。

Tony Wei  於 2021年8月5日 週四 上午10:40寫道:

> Hi,
>
> 感謝指正,我的目的是為了測試 sql hints 是否生效,選擇 `sink.parallelism` 是
> 單純因為這個配置比較好觀察結果。
> 我會再嘗試其他 hive streaming sink 的配置測試看看。謝謝。
>
> Caizhi Weng  於 2021年8月5日 週四 上午10:36寫道:
>
>> Hi!
>>
>> 单独设置 sink 并发是 1.13 的新特性。可以升级到 1.13 试试看。
>>
>> Tony Wei  于2021年8月5日周四 上午10:21写道:
>>
>> > Hi Experts,
>> >
>> > 我嘗試使用 sql hints 去覆寫 hive sink table 的配置,發現 sql hints 沒有生效。
>> > 我測試了修改 `sink.parallelism` 來調整寫入的並行度,但並行度仍舊為 `1`。(sql 任務配置的並行度)
>> >
>> > 我運行的環境版本是 Flink 1.12.2,底下是我在 flink sql client 的操作流程和結果的截圖。
>> > 寫入的 `user_hive_2` 表是透過 hive beeline 創建的,不是透過 flink sql ddl。
>> > 同時我也確認了 `table.dynamic-table-options.enabled` 的配置是啟用的。
>> >
>> > 請問是否是我在配置上或是使用上弄錯了什麼嗎?感謝解答。
>> >
>> > CREATE CATALOG MyCatalog
>> >> WITH (
>> >>   'type' = 'hive',
>> >>   'hive-conf-dir' = '/etc/hive/conf',
>> >>   'hadoop-conf-dir' = '/etc/hadoop/conf'
>> >> );
>> >>
>> >
>> >
>> > CREATE TABLE gen_users (
>> >>   name STRING,
>> >>   age INT
>> >> ) WITH (
>> >>   'connector' = 'datagen',
>> >>   'rows-per-second' = '50'
>> >> );
>> >
>> >
>> >
>> > insert into `MyCatalog`.`default`.`user_hive_2` /*+
>> >> OPTIONS('sink.parallelism'='2') */ select * from gen_users;
>> >
>> >
>> > [image: Screen Shot 2021-08-04 at 4.33.20 PM.png]
>> > [image: Screen Shot 2021-08-04 at 4.33.32 PM.png]
>> > [image: Screen Shot 2021-08-04 at 4.34.13 PM.png]
>> >
>> >
>>
>


Re: sql hints 在寫入透過 hive 創建的表時沒有作用

2021-08-04 文章 Tony Wei
Hi,

感謝指正,我的目的是為了測試 sql hints 是否生效,選擇 `sink.parallelism` 是
單純因為這個配置比較好觀察結果。
我會再嘗試其他 hive streaming sink 的配置測試看看。謝謝。

Caizhi Weng  於 2021年8月5日 週四 上午10:36寫道:

> Hi!
>
> 单独设置 sink 并发是 1.13 的新特性。可以升级到 1.13 试试看。
>
> Tony Wei  于2021年8月5日周四 上午10:21写道:
>
> > Hi Experts,
> >
> > 我嘗試使用 sql hints 去覆寫 hive sink table 的配置,發現 sql hints 沒有生效。
> > 我測試了修改 `sink.parallelism` 來調整寫入的並行度,但並行度仍舊為 `1`。(sql 任務配置的並行度)
> >
> > 我運行的環境版本是 Flink 1.12.2,底下是我在 flink sql client 的操作流程和結果的截圖。
> > 寫入的 `user_hive_2` 表是透過 hive beeline 創建的,不是透過 flink sql ddl。
> > 同時我也確認了 `table.dynamic-table-options.enabled` 的配置是啟用的。
> >
> > 請問是否是我在配置上或是使用上弄錯了什麼嗎?感謝解答。
> >
> > CREATE CATALOG MyCatalog
> >> WITH (
> >>   'type' = 'hive',
> >>   'hive-conf-dir' = '/etc/hive/conf',
> >>   'hadoop-conf-dir' = '/etc/hadoop/conf'
> >> );
> >>
> >
> >
> > CREATE TABLE gen_users (
> >>   name STRING,
> >>   age INT
> >> ) WITH (
> >>   'connector' = 'datagen',
> >>   'rows-per-second' = '50'
> >> );
> >
> >
> >
> > insert into `MyCatalog`.`default`.`user_hive_2` /*+
> >> OPTIONS('sink.parallelism'='2') */ select * from gen_users;
> >
> >
> > [image: Screen Shot 2021-08-04 at 4.33.20 PM.png]
> > [image: Screen Shot 2021-08-04 at 4.33.32 PM.png]
> > [image: Screen Shot 2021-08-04 at 4.34.13 PM.png]
> >
> >
>


Re: sql hints 在寫入透過 hive 創建的表時沒有作用

2021-08-04 文章 Caizhi Weng
Hi!

单独设置 sink 并发是 1.13 的新特性。可以升级到 1.13 试试看。

Tony Wei  于2021年8月5日周四 上午10:21写道:

> Hi Experts,
>
> 我嘗試使用 sql hints 去覆寫 hive sink table 的配置,發現 sql hints 沒有生效。
> 我測試了修改 `sink.parallelism` 來調整寫入的並行度,但並行度仍舊為 `1`。(sql 任務配置的並行度)
>
> 我運行的環境版本是 Flink 1.12.2,底下是我在 flink sql client 的操作流程和結果的截圖。
> 寫入的 `user_hive_2` 表是透過 hive beeline 創建的,不是透過 flink sql ddl。
> 同時我也確認了 `table.dynamic-table-options.enabled` 的配置是啟用的。
>
> 請問是否是我在配置上或是使用上弄錯了什麼嗎?感謝解答。
>
> CREATE CATALOG MyCatalog
>> WITH (
>>   'type' = 'hive',
>>   'hive-conf-dir' = '/etc/hive/conf',
>>   'hadoop-conf-dir' = '/etc/hadoop/conf'
>> );
>>
>
>
> CREATE TABLE gen_users (
>>   name STRING,
>>   age INT
>> ) WITH (
>>   'connector' = 'datagen',
>>   'rows-per-second' = '50'
>> );
>
>
>
> insert into `MyCatalog`.`default`.`user_hive_2` /*+
>> OPTIONS('sink.parallelism'='2') */ select * from gen_users;
>
>
> [image: Screen Shot 2021-08-04 at 4.33.20 PM.png]
> [image: Screen Shot 2021-08-04 at 4.33.32 PM.png]
> [image: Screen Shot 2021-08-04 at 4.34.13 PM.png]
>
>


sql hints 在寫入透過 hive 創建的表時沒有作用

2021-08-04 文章 Tony Wei
Hi Experts,

我嘗試使用 sql hints 去覆寫 hive sink table 的配置,發現 sql hints 沒有生效。
我測試了修改 `sink.parallelism` 來調整寫入的並行度,但並行度仍舊為 `1`。(sql 任務配置的並行度)

我運行的環境版本是 Flink 1.12.2,底下是我在 flink sql client 的操作流程和結果的截圖。
寫入的 `user_hive_2` 表是透過 hive beeline 創建的,不是透過 flink sql ddl。
同時我也確認了 `table.dynamic-table-options.enabled` 的配置是啟用的。

請問是否是我在配置上或是使用上弄錯了什麼嗎?感謝解答。

CREATE CATALOG MyCatalog
> WITH (
>   'type' = 'hive',
>   'hive-conf-dir' = '/etc/hive/conf',
>   'hadoop-conf-dir' = '/etc/hadoop/conf'
> );
>


CREATE TABLE gen_users (
>   name STRING,
>   age INT
> ) WITH (
>   'connector' = 'datagen',
>   'rows-per-second' = '50'
> );



insert into `MyCatalog`.`default`.`user_hive_2` /*+
> OPTIONS('sink.parallelism'='2') */ select * from gen_users;


[image: Screen Shot 2021-08-04 at 4.33.20 PM.png]
[image: Screen Shot 2021-08-04 at 4.33.32 PM.png]
[image: Screen Shot 2021-08-04 at 4.34.13 PM.png]


Re: flink table over 窗口报错

2021-08-04 文章 yanyunpeng
好的 谢谢答疑  我这边尝试把时间字段设置成processTime的时候就好使了  抽空是看看源码分析下


在 2021年8月4日 21:01,Shengkai Fang 写道:


我发现换成流模式下,这个问题就解了。批的模式下,应该不需要定义watermark。这里看样子是有一个 bug 存在的,建议去社区建一个jira。 
yanyunpeng  于2021年8月4日周三 下午5:42写道: > Exception in 
thread "main" org.apache.flink.table.api.ValidationException: > Ordering must 
be defined on a time attribute. > at > 
org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:111)
 > at > 
org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:69)
 > at > 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:249)
 > at > 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:160)
 > at java.util.Optional.orElseGet(Optional.java:267) > at > 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:160)
 > at > 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
 > at > 
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
 > at > 
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
 > at > 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:143)
 > at > 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
 > at > 
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
 > at > 
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
 > at > 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:88)
 > at > 
java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:269) > at 
> 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) 
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
> at > 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:89)
 > at > 
org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:235)
 > at java.util.function.Function.lambda$andThen$1(Function.java:88) > at > 
org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:198)
 > at > 
org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:194)
 > at > 
org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:183)
 > at > 
org.apache.flink.table.api.internal.TableImpl$OverWindowedTableImpl.select(TableImpl.java:956)
 > at com.rk.linkdata.flink.BatchFlinkTask.main(BatchFlinkTask.java:44) > > > > 
> Flink 版本1.13.0 > > > 在 2021年8月4日 17:37,Shengkai Fang 写道: > 
> > 能发一下具体的异常栈吗?是哪个版本? yanyunpeng  于2021年8月4日周三 > 
下午2:47写道: > Table table = tableEnv > .from("t_yyp_test") > > 
.window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > > 
.orderBy($("f_time")) > .preceding("unbounded_range") > > 
.following(CURRENT_RANGE) > .as("w")) > .select($("f_value"), > $("f_h"), > > 
$("f_l"), > $("f_j"), > $("f_value").avg().over($("w")), > > 
$("f_value").varPop().over($("w")), > > $("f_value").stddevPop().over($("w"))); 
> 也是一样的 > Exception in thread > "main" 
org.apache.flink.table.api.ValidationException: > Ordering must be > defined on 
a time attribute. > > 在 2021年8月4日 14:34,Caizhi Weng< > tsreape...@gmail.com> 
写道: > > > Hi! order by 的字段是 f_time_bak,但是 watermark > 的字段是 f_time,这两个不一致。 
yanyunpeng < > yanyunp...@rockontrol.com> > 于2021年8月4日周三 下午2:30写道: > 代码如下: > > 
EnvironmentSettings bbSettings = > > > 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > > 
> TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > > > 
tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > " > > 
f_h STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value > > 
DOUBLE,\n" + > " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n, > > 
" + > " PRIMARY KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS > > 
f_time \n" + > ") WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' = > > 

Re: flink table over 窗口报错

2021-08-04 文章 Shengkai Fang
我发现换成流模式下,这个问题就解了。批的模式下,应该不需要定义watermark。这里看样子是有一个 bug 存在的,建议去社区建一个jira。

yanyunpeng  于2021年8月4日周三 下午5:42写道:

> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Ordering must be defined on a time attribute.
> at
> org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:111)
> at
> org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:69)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:249)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:160)
> at java.util.Optional.orElseGet(Optional.java:267)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:160)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
> at
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
> at
> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:143)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
> at
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
> at
> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:88)
> at
> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:269)
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:89)
> at
> org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:235)
> at java.util.function.Function.lambda$andThen$1(Function.java:88)
> at
> org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:198)
> at
> org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:194)
> at
> org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:183)
> at
> org.apache.flink.table.api.internal.TableImpl$OverWindowedTableImpl.select(TableImpl.java:956)
> at com.rk.linkdata.flink.BatchFlinkTask.main(BatchFlinkTask.java:44)
>
>
>
>
> Flink 版本1.13.0
>
>
> 在 2021年8月4日 17:37,Shengkai Fang 写道:
>
>
> 能发一下具体的异常栈吗?是哪个版本? yanyunpeng  于2021年8月4日周三
> 下午2:47写道: > Table table = tableEnv > .from("t_yyp_test") >
> .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) >
> .orderBy($("f_time")) > .preceding("unbounded_range") >
> .following(CURRENT_RANGE) > .as("w")) > .select($("f_value"), > $("f_h"), >
> $("f_l"), > $("f_j"), > $("f_value").avg().over($("w")), >
> $("f_value").varPop().over($("w")), >
> $("f_value").stddevPop().over($("w"))); > 也是一样的 > Exception in thread
> "main" org.apache.flink.table.api.ValidationException: > Ordering must be
> defined on a time attribute. > > 在 2021年8月4日 14:34,Caizhi Weng<
> tsreape...@gmail.com> 写道: > > > Hi! order by 的字段是 f_time_bak,但是 watermark
> 的字段是 f_time,这两个不一致。 yanyunpeng < > yanyunp...@rockontrol.com>
> 于2021年8月4日周三 下午2:30写道: > 代码如下: > > EnvironmentSettings bbSettings = > >
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> > > TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > >
> tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > "
> > f_h STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value >
> DOUBLE,\n" + > " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n,
> > " + > " PRIMARY KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS
> > f_time \n" + > ") WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' =
> > 'jdbc:mysql://***',\n" + > " 'table-name' = '123',\n" + > " 'username' =
> > '123',\n" + > " 'password' = '123'\n" + > ")"); > >
> 

(无主题)

2021-08-04 文章 洗你的头
退订

Re: flink table over 窗口报错

2021-08-04 文章 yanyunpeng
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Ordering must be defined on a time attribute.
at 
org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:111)
at 
org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:69)
at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:249)
at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:160)
at java.util.Optional.orElseGet(Optional.java:267)
at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:160)
at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
at 
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
at 
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:143)
at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
at 
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
at 
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:88)
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:269)
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at 
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:89)
at 
org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:235)
at java.util.function.Function.lambda$andThen$1(Function.java:88)
at 
org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:198)
at 
org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:194)
at 
org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:183)
at 
org.apache.flink.table.api.internal.TableImpl$OverWindowedTableImpl.select(TableImpl.java:956)
at com.rk.linkdata.flink.BatchFlinkTask.main(BatchFlinkTask.java:44)




Flink 版本1.13.0


在 2021年8月4日 17:37,Shengkai Fang 写道:


能发一下具体的异常栈吗?是哪个版本? yanyunpeng  于2021年8月4日周三 
下午2:47写道: > Table table = tableEnv > .from("t_yyp_test") > 
.window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > .orderBy($("f_time")) 
> .preceding("unbounded_range") > .following(CURRENT_RANGE) > .as("w")) > 
.select($("f_value"), > $("f_h"), > $("f_l"), > $("f_j"), > 
$("f_value").avg().over($("w")), > $("f_value").varPop().over($("w")), > 
$("f_value").stddevPop().over($("w"))); > 也是一样的 > Exception in thread "main" 
org.apache.flink.table.api.ValidationException: > Ordering must be defined on a 
time attribute. > > 在 2021年8月4日 14:34,Caizhi Weng 写道: > > 
> Hi! order by 的字段是 f_time_bak,但是 watermark 的字段是 f_time,这两个不一致。 yanyunpeng < > 
yanyunp...@rockontrol.com> 于2021年8月4日周三 下午2:30写道: > 代码如下: > > 
EnvironmentSettings bbSettings = > > 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > > 
TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > > 
tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > " > 
f_h STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value > 
DOUBLE,\n" + > " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n, > " 
+ > " PRIMARY KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS > 
f_time \n" + > ") WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' = > 
'jdbc:mysql://***',\n" + > " 'table-name' = '123',\n" + > " 'username' = > 
'123',\n" + > " 'password' = '123'\n" + > ")"); > > 
tableEnv.registerFunction("GaussianFunction", new GaussianFunction()); > > 
Table table = tableEnv > .from("t_yyp_test") > > 
.window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > > 
.orderBy($("f_time_bak")) > .preceding("unbounded_range") > > 
.following(CURRENT_RANGE) > .as("w")) > 

filesystem connector不支持跨subtask合并小文件

2021-08-04 文章 lixin58...@163.com
你好,
在使用filesystem 
connector过程中,开启了compaction,使用parquet列式文件,指定3个并行度,但发现无论如何也触发不了合并,因为列式文件是checkpoint触发时才会滚动,这样同一checkpoint内会产生与并行度相同的文件,按说此时文件数已经大于1了,为什么不合并呢?

create table fs_parquet_compact
(userid bigint, name string, part string)
PARTITIONED BY (part)
with(
'connector' = 'filesystem',
'path' = 'hdfs:///data/fs_parquet_compact',
'format' = 'parquet',
'auto-compaction' = 'true',
'compaction.file-size' = '2kb',
'sink.rolling-policy.file-size' = '500b',
'sink.rolling-policy.rollover-interval' = '800s',
'sink.rolling-policy.check-interval' = '60s'
);



lixin58...@163.com


Re: flink table over 窗口报错

2021-08-04 文章 Shengkai Fang
能发一下具体的异常栈吗?是哪个版本?

yanyunpeng  于2021年8月4日周三 下午2:47写道:

> Table table = tableEnv
>  .from("t_yyp_test")
>  .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j"))
>  .orderBy($("f_time"))
>  .preceding("unbounded_range")
>  .following(CURRENT_RANGE)
>  .as("w"))
>  .select($("f_value"),
>  $("f_h"),
>  $("f_l"),
>  $("f_j"),
>  $("f_value").avg().over($("w")),
>  $("f_value").varPop().over($("w")),
>  $("f_value").stddevPop().over($("w")));
> 也是一样的
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Ordering must be defined on a time attribute.
>
> 在 2021年8月4日 14:34,Caizhi Weng 写道:
>
>
> Hi! order by 的字段是 f_time_bak,但是 watermark 的字段是 f_time,这两个不一致。 yanyunpeng <
> yanyunp...@rockontrol.com> 于2021年8月4日周三 下午2:30写道: > 代码如下: >
> EnvironmentSettings bbSettings = >
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> > TableEnvironment tableEnv = TableEnvironment.create(bbSettings); >
> tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > "
> f_h STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value
> DOUBLE,\n" + > " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n,
> " + > " PRIMARY KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS
> f_time \n" + > ") WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' =
> 'jdbc:mysql://***',\n" + > " 'table-name' = '123',\n" + > " 'username' =
> '123',\n" + > " 'password' = '123'\n" + > ")"); >
> tableEnv.registerFunction("GaussianFunction", new GaussianFunction()); >
> Table table = tableEnv > .from("t_yyp_test") >
> .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) >
> .orderBy($("f_time_bak")) > .preceding("unbounded_range") >
> .following(CURRENT_RANGE) > .as("w")) > .select($("f_h"), > $("f_l"), >
> $("f_j"), > $("f_value").avg().over($("w")), >
> $("f_value").varPop().over($("w")), >
> $("f_value").stddevPop().over($("w"))); > > > 已经定义了eventTime
> 使用eventTIme或者别的时间字段排序都报错 > > > Exception in thread "main"
> org.apache.flink.table.api.ValidationException: > Ordering must be defined
> on a time attribute. > > > 请问这是什么原因


Re: Flink sql 维表聚合问题请教

2021-08-04 文章 carlc
额...,说的太对了,  batch任务没问题,但流任务就发生意想不到的问题.

该需求就是翻译原离线SQL(传统数仓), 现要改成实时分析. 结果发现有些需求好像实现不了

非常感谢!



> 在 2021年8月4日,16:50,黑色  写道:
> 
> 你这是维表lookup,上流来数据来了,根据on后面的key,是当前去查快照返回结果,不可能是聚合之后的
> 当然你要是batch来了,没问题
> 
> 
> 
> 
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2021年8月4日(星期三) 下午4:44
> 收件人:"user-zh" 
> 主题:Re: Flink sql 维表聚合问题请教
> 
> 
> 
> Hi!
> 
> 我查了一下,processing time temporal join 确实还没有实现... 这里可能需要变成 event time temporal
> join[1] 或者双流 join 了。但更好的方法可能是维表本身就已经计算好所需的数据。
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#event-time-temporal-join
> 
> carlc  
>  感谢大佬回复,我尝试着换种写法,但这样些的话会直接报错。
> 
>  create view v_bl_user_count as (
>  select user_id, count(1)
>  from mysql_user_blacklist
>  group by user_id
>  );
> 
>  select t1.`user_id`
>  , t1.`event_type`
>  , t1.`current_ts`
>  from kafka_user_event t1
>  left join v_bl_user_count FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2 on
>  t1.`user_id` = t2.`user_id`
>  where t1.`event_type` = ‘LOGIN’
> 
>  异常信息:
>  org.apache.flink.table.api.TableException: Processing-time temporal join
>  is not supported yet.
>  at
>  
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
>  at
>  
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
>  at
>  
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
>  at
>  
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
>  at
>  
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
> 
> 
> 
> 
>   在 2021年8月4日,14:18,Caizhi Weng   
>   Hi!
>  
>   这是因为每次维表 join 都会向下游发送两条数据,一共发送了四条,所以最后 count 的结果为 4,是符合预期的。
>  
>   为什么不直接对维表做 agg 呢?当然对维表做 agg 的话,这里就不是 lookup join 而是 process time 
> temporal
>   table join 了。
>  
>   carlc   
>   请教下如何在维表上做聚合操作? 如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~
>  
>   -- 模拟需求(有点牵强...):
>   -- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表
>  mysql_user_blacklist
>   统计对应 user_id 在维表中的次数 - 即: 在维表上做聚合操作
>  
>   -- 1. 创建user_blacklist表
>   CREATE TABLE `user_blacklist` (
>   `user_id` bigint(20) NOT NULL,
>   `create_time` datetime NOT NULL,
>   PRIMARY KEY (`user_id`,`create_time`)
>   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
>   INSERT INTO user_blacklist (`user_id`, `create_time`)
>   VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'),
>   (2,'2021-01-04 00:00:00');
>  
>   -- 2. 模拟kafka数据:
>   -- 第1条: 
> {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01
>   00:00:00"}
>   -- 第2条: 
> {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02
>   00:00:00"}
>  
>   -- 操作步骤:
>   当发送第1条kafka数据得到如下输出:
>   | OP| user_id| event_type | current_ts| bl_count |
>   | +I | 1 | LOGIN | 2021-10-01T00:00 | 1 |
>   | +I | 1 | LOGIN | 2021-10-01T00:00 | 2 |
>   当再次发送第1条kafka数据得到如下输出:
>   | +I | 1 | LOGIN | 2021-10-01T00:00 | 3 |
>   | +I | 1 | LOGIN | 2021-10-01T00:00 | 4 |
>  
>   — SQL 如下:
>  
>   create table kafka_user_event
>   (
>   `user_id` BIGINT,
>   `event_type` STRING,
>   `current_ts` timestamp(3),
>   `proc_time` AS PROCTIME()
>   ) WITH (
>   'connector' = 'kafka',
>   ...
>   );
>  
>   create table mysql_user_blacklist
>   (
>   user_id BIGINT,
>   create_time timestamp(3),
>   primary key (user_id,create_time) not enforced
>   ) WITH (
>   'connector' = 'jdbc',
>   …
>   );
>  
>   create view v2_user_event as (
>   select t1.`user_id`
>   , t1.`event_type`
>   , t1.`current_ts`
>   , count(1) over ( partition by t2.`user_id` order by 
> t1.`proc_time` ROWS
>   BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count
>   from kafka_user_event t1
>   left join mysql_user_blacklist FOR SYSTEM_TIME AS OF 
> t1.`proc_time` AS
>  t2
>   on t1.`user_id` = t2.`user_id`
>   where t1.`event_type` = 'LOGIN'
>   );
>  
>   select * from v2_user_event;
>  
>  
> 
> 



Re: 1.14啥时候出呀

2021-08-04 文章 Jingsong Li
1.14还有1-2个月
1.13.2马上就出了,估计明天或后天或周一

On Wed, Aug 4, 2021 at 4:48 PM yidan zhao  wrote:

> 如题,1.14或1.13.2啥时候出呀,有人知道吗。
>


-- 
Best, Jingsong Lee


?????? Flink sql ????????????????

2021-08-04 文章 ????
??lookup??on??key,??
??batch




----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#event-time-temporal-join

carlc 

1.14啥时候出呀

2021-08-04 文章 yidan zhao
如题,1.14或1.13.2啥时候出呀,有人知道吗。


Re: Flink sql 维表聚合问题请教

2021-08-04 文章 Caizhi Weng
Hi!

我查了一下,processing time temporal join 确实还没有实现... 这里可能需要变成 event time temporal
join[1] 或者双流 join 了。但更好的方法可能是维表本身就已经计算好所需的数据。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#event-time-temporal-join

carlc  于2021年8月4日周三 下午3:57写道:

> 感谢大佬回复,我尝试着换种写法,但这样些的话会直接报错。
>
> create view v_bl_user_count as (
> select user_id, count(1)
> from mysql_user_blacklist
> group by user_id
> );
>
> select t1.`user_id`
>  , t1.`event_type`
>  , t1.`current_ts`
> from kafka_user_event t1
> left join v_bl_user_count FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2 on
> t1.`user_id` = t2.`user_id`
> where t1.`event_type` = ‘LOGIN’
>
> 异常信息:
> org.apache.flink.table.api.TableException: Processing-time temporal join
> is not supported yet.
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>
>
>
>
> > 在 2021年8月4日,14:18,Caizhi Weng  写道:
> >
> > Hi!
> >
> > 这是因为每次维表 join 都会向下游发送两条数据,一共发送了四条,所以最后 count 的结果为 4,是符合预期的。
> >
> > 为什么不直接对维表做 agg 呢?当然对维表做 agg 的话,这里就不是 lookup join 而是 process time temporal
> > table join 了。
> >
> > carlc  于2021年8月4日周三 上午10:41写道:
> >
> >> 请教下如何在维表上做聚合操作?  如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~
> >>
> >> -- 模拟需求(有点牵强...):
> >> -- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表
> mysql_user_blacklist
> >> 统计对应 user_id 在维表中的次数 -> 即: 在维表上做聚合操作
> >>
> >> -- 1. 创建user_blacklist表
> >> CREATE TABLE `user_blacklist` (
> >> `user_id` bigint(20) NOT NULL,
> >> `create_time` datetime NOT NULL,
> >> PRIMARY KEY (`user_id`,`create_time`)
> >> ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
> >> INSERT INTO user_blacklist (`user_id`, `create_time`)
> >> VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'),
> >> (2,'2021-01-04 00:00:00');
> >>
> >> -- 2. 模拟kafka数据:
> >> -- 第1条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01
> >> 00:00:00"}
> >> -- 第2条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02
> >> 00:00:00"}
> >>
> >> -- 操作步骤:
> >> 当发送第1条kafka数据得到如下输出:
> >> | OP| user_id| event_type | current_ts| bl_count |
> >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 1 |
> >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 2 |
> >> 当再次发送第1条kafka数据得到如下输出:
> >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 3 |
> >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 4 |
> >>
> >> — SQL 如下:
> >>
> >> create table kafka_user_event
> >> (
> >> `user_id` BIGINT,
> >> `event_type` STRING,
> >> `current_ts` timestamp(3),
> >> `proc_time` AS PROCTIME()
> >> ) WITH (
> >> 'connector' = 'kafka',
> >> ...
> >> );
> >>
> >> create table mysql_user_blacklist
> >> (
> >> user_id BIGINT,
> >> create_time timestamp(3),
> >> primary key (user_id,create_time) not enforced
> >> ) WITH (
> >> 'connector' = 'jdbc',
> >> …
> >> );
> >>
> >> create view v2_user_event as (
> >> select t1.`user_id`
> >> , t1.`event_type`
> >> , t1.`current_ts`
> >> , count(1) over ( partition by t2.`user_id` order by t1.`proc_time` ROWS
> >> BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count
> >> from kafka_user_event t1
> >> left join mysql_user_blacklist FOR SYSTEM_TIME AS OF t1.`proc_time` AS
> t2
> >> on t1.`user_id` = t2.`user_id`
> >> where t1.`event_type` = 'LOGIN'
> >> );
> >>
> >> select * from v2_user_event;
> >>
> >>
>
>


Re: Flink sql 维表聚合问题请教

2021-08-04 文章 carlc
感谢大佬回复,我尝试着换种写法,但这样些的话会直接报错。

create view v_bl_user_count as (
select user_id, count(1)
from mysql_user_blacklist
group by user_id
);

select t1.`user_id`
 , t1.`event_type`
 , t1.`current_ts`
from kafka_user_event t1
left join v_bl_user_count FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2 on 
t1.`user_id` = t2.`user_id`
where t1.`event_type` = ‘LOGIN’

异常信息: 
org.apache.flink.table.api.TableException: Processing-time temporal join is not 
supported yet.
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)




> 在 2021年8月4日,14:18,Caizhi Weng  写道:
> 
> Hi!
> 
> 这是因为每次维表 join 都会向下游发送两条数据,一共发送了四条,所以最后 count 的结果为 4,是符合预期的。
> 
> 为什么不直接对维表做 agg 呢?当然对维表做 agg 的话,这里就不是 lookup join 而是 process time temporal
> table join 了。
> 
> carlc  于2021年8月4日周三 上午10:41写道:
> 
>> 请教下如何在维表上做聚合操作?  如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~
>> 
>> -- 模拟需求(有点牵强...):
>> -- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表 mysql_user_blacklist
>> 统计对应 user_id 在维表中的次数 -> 即: 在维表上做聚合操作
>> 
>> -- 1. 创建user_blacklist表
>> CREATE TABLE `user_blacklist` (
>> `user_id` bigint(20) NOT NULL,
>> `create_time` datetime NOT NULL,
>> PRIMARY KEY (`user_id`,`create_time`)
>> ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
>> INSERT INTO user_blacklist (`user_id`, `create_time`)
>> VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'),
>> (2,'2021-01-04 00:00:00');
>> 
>> -- 2. 模拟kafka数据:
>> -- 第1条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01
>> 00:00:00"}
>> -- 第2条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02
>> 00:00:00"}
>> 
>> -- 操作步骤:
>> 当发送第1条kafka数据得到如下输出:
>> | OP| user_id| event_type | current_ts| bl_count |
>> | +I | 1 | LOGIN | 2021-10-01T00:00 | 1 |
>> | +I | 1 | LOGIN | 2021-10-01T00:00 | 2 |
>> 当再次发送第1条kafka数据得到如下输出:
>> | +I | 1 | LOGIN | 2021-10-01T00:00 | 3 |
>> | +I | 1 | LOGIN | 2021-10-01T00:00 | 4 |
>> 
>> — SQL 如下:
>> 
>> create table kafka_user_event
>> (
>> `user_id` BIGINT,
>> `event_type` STRING,
>> `current_ts` timestamp(3),
>> `proc_time` AS PROCTIME()
>> ) WITH (
>> 'connector' = 'kafka',
>> ...
>> );
>> 
>> create table mysql_user_blacklist
>> (
>> user_id BIGINT,
>> create_time timestamp(3),
>> primary key (user_id,create_time) not enforced
>> ) WITH (
>> 'connector' = 'jdbc',
>> …
>> );
>> 
>> create view v2_user_event as (
>> select t1.`user_id`
>> , t1.`event_type`
>> , t1.`current_ts`
>> , count(1) over ( partition by t2.`user_id` order by t1.`proc_time` ROWS
>> BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count
>> from kafka_user_event t1
>> left join mysql_user_blacklist FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2
>> on t1.`user_id` = t2.`user_id`
>> where t1.`event_type` = 'LOGIN'
>> );
>> 
>> select * from v2_user_event;
>> 
>> 



Re: 几个Flink 1.12. 2超时问题

2021-08-04 文章 Chenyu Zheng
目前是在所有taskmanager容器都成功启动之后,才出现的timeout,所以不可能是调度层面的问题。
目前我们在网络层面使用的是生产环境的网络,该环境被用于跑大量的生产流量,也没有其他容器反馈过类似问题。

我目前还是比较怀疑flink本身的某个配置导致了这个现象,请问flink是否有相关的metrics或日志可以参考?

On 2021/8/4, 11:50 AM, "东东"  wrote:



应该可以从两个层面查一下:
1、调度层面。native 
application是先启动JM容器,然后由JM容器与K8s交互拉起TM的,可以看一下K8s日志,看看整个流程是否有瓶颈点,比如镜像的拉取,TM容器的启动之类。

2、网络层面。如果调度没有问题,各容器启动的过程和速度都很正常,那就要看网络层面是否存在瓶颈,必要的时候可以tcpdump一下。







在 2021-08-03 14:02:53,"Chenyu Zheng"  写道:

开发者您好,



我正在尝试在Kubernetes上部署Flink 1.12.2,使用的是native 
application部署模式。但是在测试中发现,当将作业并行度调大之后,各种timeout时有发生。根据监控看,JM和TM容器的cpu和内存都没有使用到k8s给分配的量。



在尝试调大akka.ask.timeout至1分钟,和heartbeat.timeout至2分钟之后,各种超时现象得以缓解。




我的问题是,当设置较大并行度(比如128)时,akka超时和心跳超时的各种现象都是正常的吗?如果不正常,需要用什么方式去troubleshot问题的根源呢?另外单纯一味调大各个组件的超时时间,会带来什么负面作用呢?



附件中有akka超时的jobmanager日志,TaskManager心跳超时日志稍后会发上来。



谢谢!




Re: flink 1.13.1 使用hive方言,执行hive sql解析报错

2021-08-04 文章 Rui Li
用1.1.0试了一下也没复现,你insert语句中的中文如果换成英文试试看解析能不能过呢

On Mon, Aug 2, 2021 at 3:05 PM Asahi Lee <978466...@qq.com.invalid> wrote:

> hive 1.1.0版本
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> lirui.fu...@gmail.com;
> 发送时间:2021年8月2日(星期一) 中午12:23
> 收件人:"user-zh"
> 主题:Re: flink 1.13.1 使用hive方言,执行hive sql解析报错
>
>
>
> 我本地试了一下没有复现你的问题,你的hive版本是什么呢?
>
> On Fri, Jul 30, 2021 at 3:00 PM Asahi Lee <978466...@qq.com.invalid
> wrote:
>
>  CREATE TABLE `cosldatacenter.ods_emp_maindata_iadc_paramvalue`(
>  nbsp; `paramvalue_id` string COMMENT '',nbsp;
>  nbsp; `platform_id` string COMMENT '',nbsp;
>  nbsp; `equipment_id` string COMMENT '',nbsp;
>  nbsp; `param_id` string COMMENT '',nbsp;
>  nbsp; `param_value` string COMMENT '',nbsp;
>  nbsp; `remark` string COMMENT '',nbsp;
>  nbsp; `create_time` string COMMENT '',nbsp;
>  nbsp; `creator` string COMMENT '',nbsp;
>  nbsp; `update_time` string COMMENT '',nbsp;
>  nbsp; `update_person` string COMMENT '',nbsp;
>  nbsp; `record_flag` double COMMENT '',nbsp;
>  nbsp; `subject_id` string COMMENT '',nbsp;
>  nbsp; `output_unit` string COMMENT '',nbsp;
>  nbsp; `show_seq` double COMMENT '')
>  COMMENT ''
>  ROW FORMAT SERDEnbsp;
>  nbsp; 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'nbsp;
>  WITH SERDEPROPERTIES (nbsp;
>  nbsp; 'field.delim'=',',nbsp;
>  nbsp; 'serialization.format'=',')nbsp;
>  STORED AS INPUTFORMATnbsp;
>  nbsp; 'org.apache.hadoop.hive.ql.io
> .orc.OrcInputFormat'nbsp;
>  OUTPUTFORMATnbsp;
>  nbsp; 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
>  LOCATION
>  nbsp;
> 
> 'hdfs://hadoop02:8020/user/hive/warehouse/cosldatacenter.db/ods_emp_maindata_iadc_paramvalue'
>  TBLPROPERTIES (
>  nbsp; 'COLUMN_STATS_ACCURATE'='false',nbsp;
>  nbsp; 'last_modified_by'='root',nbsp;
>  nbsp; 'last_modified_time'='1621834335',nbsp;
>  nbsp; 'numFiles'='0',nbsp;
>  nbsp; 'numRows'='-1',nbsp;
>  nbsp; 'rawDataSize'='-1',nbsp;
>  nbsp; 'totalSize'='0',nbsp;
>  nbsp; 'transient_lastDdlTime'='1621834335')
> 
> 
> 
>  CREATE TABLE `cosldatacenter.ods_emp_md_large_equip`(
>  nbsp; `large_equip_id` string COMMENT '',nbsp;
>  nbsp; `equip_name` string COMMENT '',nbsp;
>  nbsp; `equip_type` string COMMENT '',nbsp;
>  nbsp; `equip_function` string COMMENT '',nbsp;
>  nbsp; `equip_board` string COMMENT '',nbsp;
>  nbsp; `ship_yard` string COMMENT '',nbsp;
>  nbsp; `manufacturer_date` string COMMENT '',nbsp;
>  nbsp; `enqueue_date` string COMMENT '',nbsp;
>  nbsp; `dockrepair_date` string COMMENT '',nbsp;
>  nbsp; `scrap_date` string COMMENT '',nbsp;
>  nbsp; `enqueue_mode` string COMMENT '',nbsp;
>  nbsp; `work_for_org` string COMMENT '',nbsp;
>  nbsp; `work_in_org` string COMMENT '',nbsp;
>  nbsp; `old_age` string COMMENT '',nbsp;
>  nbsp; `create_time` date COMMENT '',nbsp;
>  nbsp; `creator` string COMMENT '',nbsp;
>  nbsp; `update_time` date COMMENT '',nbsp;
>  nbsp; `update_person` string COMMENT '',nbsp;
>  nbsp; `record_flag` double COMMENT '',nbsp;
>  nbsp; `data_timestamp` string COMMENT '',nbsp;
>  nbsp; `work_unit_id` string COMMENT '',nbsp;
>  nbsp; `work_status` string COMMENT '',nbsp;
>  nbsp; `work_location` string COMMENT '',nbsp;
>  nbsp; `work_area` string COMMENT '',nbsp;
>  nbsp; `equip_code` string COMMENT '',nbsp;
>  nbsp; `shi_main_power` double COMMENT '',nbsp;
>  nbsp; `shi_total_len` double COMMENT '',nbsp;
>  nbsp; `shi_type_width` double COMMENT '',nbsp;
>  nbsp; `shi_type_depth` double COMMENT '',nbsp;
>  nbsp; `shi_design_draft` double COMMENT '',nbsp;
>  nbsp; `shi_total_tonnage` double COMMENT '',nbsp;
>  nbsp; `shi_load_tonnage` double COMMENT '',nbsp;
>  nbsp; `remark` string COMMENT '',nbsp;
>  nbsp; `unit_classification1` string COMMENT '',nbsp;
>  nbsp; `unit_classification2` string COMMENT '')
>  COMMENT ''
>  ROW FORMAT SERDEnbsp;
>  nbsp; 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'nbsp;
>  WITH SERDEPROPERTIES (nbsp;
>  nbsp; 'field.delim'=',',nbsp;
>  nbsp; 'serialization.format'=',')nbsp;
>  STORED AS INPUTFORMATnbsp;
>  nbsp; 'org.apache.hadoop.hive.ql.io
> .orc.OrcInputFormat'nbsp;
>  OUTPUTFORMATnbsp;
>  nbsp; 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
>  LOCATION
>  nbsp;
> 
> 'hdfs://hadoop02:8020/user/hive/warehouse/cosldatacenter.db/ods_emp_md_large_equip'
>  TBLPROPERTIES (
>  nbsp; 'COLUMN_STATS_ACCURATE'='false',nbsp;
>  nbsp; 'last_modified_by'='root',nbsp;
>  nbsp; 'last_modified_time'='1621834338',nbsp;
>  nbsp; 'numFiles'='0',nbsp;
>  nbsp; 'numRows'='-1',nbsp;
>  nbsp; 'rawDataSize'='-1',nbsp;
>  nbsp; 'totalSize'='0',nbsp;
>  nbsp; 'transient_lastDdlTime'='1621834338')
> 
> 
> 
>  CREATE TABLE `cosldatacenter.ods_emp_maindata_iadc_paramdef`(
>  nbsp; `param_id` string COMMENT '',nbsp;
>  nbsp; `iadc_id` string COMMENT '',nbsp;
>  nbsp; `param_code` string COMMENT '',nbsp;
>  nbsp; `param_en` string COMMENT '',nbsp;
>  nbsp; `param_cn` string COMMENT '',nbsp;
>  nbsp; `output_standard` 

Re: flink table over 窗口报错

2021-08-04 文章 yanyunpeng
Table table = tableEnv
 .from("t_yyp_test")
 .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j"))
 .orderBy($("f_time"))
 .preceding("unbounded_range")
 .following(CURRENT_RANGE)
 .as("w"))
 .select($("f_value"),
 $("f_h"),
 $("f_l"),
 $("f_j"),
 $("f_value").avg().over($("w")),
 $("f_value").varPop().over($("w")),
 $("f_value").stddevPop().over($("w")));
也是一样的
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Ordering must be defined on a time attribute.

在 2021年8月4日 14:34,Caizhi Weng 写道:


Hi! order by 的字段是 f_time_bak,但是 watermark 的字段是 f_time,这两个不一致。 yanyunpeng 
 于2021年8月4日周三 下午2:30写道: > 代码如下: > 
EnvironmentSettings bbSettings = > 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > 
TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > 
tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > " f_h 
STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value DOUBLE,\n" + 
> " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n, " + > " PRIMARY 
KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS f_time \n" + > ") 
WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' = 'jdbc:mysql://***',\n" + 
> " 'table-name' = '123',\n" + > " 'username' = '123',\n" + > " 'password' = 
'123'\n" + > ")"); > tableEnv.registerFunction("GaussianFunction", new 
GaussianFunction()); > Table table = tableEnv > .from("t_yyp_test") > 
.window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > 
.orderBy($("f_time_bak")) > .preceding("unbounded_range") > 
.following(CURRENT_RANGE) > .as("w")) > .select($("f_h"), > $("f_l"), > 
$("f_j"), > $("f_value").avg().over($("w")), > 
$("f_value").varPop().over($("w")), > $("f_value").stddevPop().over($("w"))); > 
> > 已经定义了eventTime 使用eventTIme或者别的时间字段排序都报错 > > > Exception in thread "main" 
org.apache.flink.table.api.ValidationException: > Ordering must be defined on a 
time attribute. > > > 请问这是什么原因

Re: flink table over 窗口报错

2021-08-04 文章 Caizhi Weng
Hi!

order by 的字段是 f_time_bak,但是 watermark 的字段是 f_time,这两个不一致。

yanyunpeng  于2021年8月4日周三 下午2:30写道:

> 代码如下:
> EnvironmentSettings bbSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> TableEnvironment tableEnv = TableEnvironment.create(bbSettings);
> tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" +
> "  f_id INT,\n" +
> "  f_h STRING,\n" +
> "  f_l STRING,\n" +
> "  f_j STRING,\n" +
> "  f_value DOUBLE,\n" +
> "  f_time TIMESTAMP(3)\n, " +
> "  f_time_bak TIMESTAMP(3)\n, " +
> "  PRIMARY KEY (f_id) NOT ENFORCED,\n" +
> "  WATERMARK FOR f_time AS f_time \n" +
> ") WITH (\n" +
> "   'connector' = 'jdbc',\n" +
> "   'url' = 'jdbc:mysql://***',\n" +
> "   'table-name' = '123',\n" +
> "   'username' = '123',\n" +
> "   'password' = '123'\n" +
> ")");
> tableEnv.registerFunction("GaussianFunction", new GaussianFunction());
> Table table = tableEnv
> .from("t_yyp_test")
> .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j"))
> .orderBy($("f_time_bak"))
> .preceding("unbounded_range")
> .following(CURRENT_RANGE)
> .as("w"))
> .select($("f_h"),
> $("f_l"),
> $("f_j"),
> $("f_value").avg().over($("w")),
> $("f_value").varPop().over($("w")),
> $("f_value").stddevPop().over($("w")));
>
>
> 已经定义了eventTime  使用eventTIme或者别的时间字段排序都报错
>
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Ordering must be defined on a time attribute.
>
>
> 请问这是什么原因


flink table over 窗口报错

2021-08-04 文章 yanyunpeng
代码如下:
EnvironmentSettings bbSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(bbSettings);
tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" +
"  f_id INT,\n" +
"  f_h STRING,\n" +
"  f_l STRING,\n" +
"  f_j STRING,\n" +
"  f_value DOUBLE,\n" +
"  f_time TIMESTAMP(3)\n, " +
"  f_time_bak TIMESTAMP(3)\n, " +
"  PRIMARY KEY (f_id) NOT ENFORCED,\n" +
"  WATERMARK FOR f_time AS f_time \n" +
") WITH (\n" +
"   'connector' = 'jdbc',\n" +
"   'url' = 'jdbc:mysql://***',\n" +
"   'table-name' = '123',\n" +
"   'username' = '123',\n" +
"   'password' = '123'\n" +
")");
tableEnv.registerFunction("GaussianFunction", new GaussianFunction());
Table table = tableEnv
.from("t_yyp_test")
.window(Over.partitionBy($("f_h"), $("f_l"), $("f_j"))
.orderBy($("f_time_bak"))
.preceding("unbounded_range")
.following(CURRENT_RANGE)
.as("w"))
.select($("f_h"),
$("f_l"),
$("f_j"),
$("f_value").avg().over($("w")),
$("f_value").varPop().over($("w")),
$("f_value").stddevPop().over($("w")));


已经定义了eventTime  使用eventTIme或者别的时间字段排序都报错


Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Ordering must be defined on a time attribute.  


请问这是什么原因

Re: Flink sql 维表聚合问题请教

2021-08-04 文章 Caizhi Weng
Hi!

这是因为每次维表 join 都会向下游发送两条数据,一共发送了四条,所以最后 count 的结果为 4,是符合预期的。

为什么不直接对维表做 agg 呢?当然对维表做 agg 的话,这里就不是 lookup join 而是 process time temporal
table join 了。

carlc  于2021年8月4日周三 上午10:41写道:

> 请教下如何在维表上做聚合操作?  如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~
>
> -- 模拟需求(有点牵强...):
> -- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表 mysql_user_blacklist
> 统计对应 user_id 在维表中的次数 -> 即: 在维表上做聚合操作
>
> -- 1. 创建user_blacklist表
> CREATE TABLE `user_blacklist` (
> `user_id` bigint(20) NOT NULL,
> `create_time` datetime NOT NULL,
> PRIMARY KEY (`user_id`,`create_time`)
> ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
> INSERT INTO user_blacklist (`user_id`, `create_time`)
> VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'),
> (2,'2021-01-04 00:00:00');
>
> -- 2. 模拟kafka数据:
> -- 第1条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01
> 00:00:00"}
> -- 第2条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02
> 00:00:00"}
>
> -- 操作步骤:
> 当发送第1条kafka数据得到如下输出:
> | OP| user_id| event_type | current_ts| bl_count |
> | +I | 1 | LOGIN | 2021-10-01T00:00 | 1 |
> | +I | 1 | LOGIN | 2021-10-01T00:00 | 2 |
> 当再次发送第1条kafka数据得到如下输出:
> | +I | 1 | LOGIN | 2021-10-01T00:00 | 3 |
> | +I | 1 | LOGIN | 2021-10-01T00:00 | 4 |
>
> — SQL 如下:
>
> create table kafka_user_event
> (
> `user_id` BIGINT,
> `event_type` STRING,
> `current_ts` timestamp(3),
> `proc_time` AS PROCTIME()
> ) WITH (
> 'connector' = 'kafka',
> ...
> );
>
> create table mysql_user_blacklist
> (
> user_id BIGINT,
> create_time timestamp(3),
> primary key (user_id,create_time) not enforced
> ) WITH (
> 'connector' = 'jdbc',
> …
> );
>
> create view v2_user_event as (
> select t1.`user_id`
> , t1.`event_type`
> , t1.`current_ts`
> , count(1) over ( partition by t2.`user_id` order by t1.`proc_time` ROWS
> BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count
> from kafka_user_event t1
> left join mysql_user_blacklist FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2
> on t1.`user_id` = t2.`user_id`
> where t1.`event_type` = 'LOGIN'
> );
>
> select * from v2_user_event;
>
>