Re: FlinkSQL CDC 窗口分组聚合求助

2020-11-22 文章 jy l
好的,我试一下。谢谢

Best

Jark Wu  于2020年11月23日周一 下午2:06写道:

> 那是不是用非窗口聚合,开5s mini batch,是不是可以达到你的需求?
>
> Best,
> Jark
>
> On Mon, 23 Nov 2020 at 13:16, jy l  wrote:
>
> > 使用场景就是我们想用Flink 根据订单系统中的订单表,每5s钟计算一次总的交易金额。
> > 目前我们的系统大致架构是mysql(debezium)>kafka--->flink>es
> >
> > Jark Wu  于2020年11月23日周一 上午10:35写道:
> >
> > > Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。
> > > 你可以使用非 window 聚合来代替。
> > >
> > > Btw,你可能说一下你的需求场景么? 为什么需要在  CDC 上做 window 操作呢?
> > >
> > > Best,
> > > Jark
> > >
> > > On Mon, 23 Nov 2020 at 10:28, jy l  wrote:
> > >
> > > > Hi:
> > > > 我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下:
> > > > [image: image.png]
> > > > [image: image.png]
> > > > 分组计算的SQL如下:
> > > > [image: image.png]
> > > > 在执行计算时,报了如下异常:
> > > > Exception in thread "main" org.apache.flink.table.api.TableException:
> > > > GroupWindowAggregate doesn't support consuming update and delete
> > changes
> > > > which is produced by node TableSourceScan(table=[[default_catalog,
> > > > default_database, t_order,
> > watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1,
> > > > 1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps,
> > > > orderInformationId, userId, categoryId, productId, price,
> productCount,
> > > > priceSum, shipAddress, receiverAddress])
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> > > > at
> > > >
> > >
> >
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> > > > at scala.collection.immutable.Range.foreach(Range.scala:155)
> > > > at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> > > > at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> > > > at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> > > > at
> > > >
> > >
> >
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> > > >
> > > > 我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。
> > > > 那面对我这样的情况,该用什么方案来解决?
> > > > 望知道的各位告知一下,感谢!
> > > >
> > > > 祝好
> > > >
> > > >
> > >
> >
>


Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-22 文章 赵一旦
看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes
页面。

1.11中定义mysql表的时候需不需要显示指定upodate-mode,毕竟同时支持append-mode和upsert-mode。还是说flink会自己去判定。

Jark Wu  于2020年11月23日周一 下午3:32写道:

> 请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。
>
> On Mon, 23 Nov 2020 at 15:21, 赵一旦  wrote:
>
> > 如下是Flink官方文档JBDC connector的部分内容。Key handling
> > <
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> > >
> >
> > Flink uses the primary key that defined in DDL when writing data to
> > external databases. The connector operate in upsert mode if the primary
> key
> > was defined, otherwise, the connector operate in append mode.
> >
> > In upsert mode, Flink will insert a new row or update the existing row
> > according to the primary key, Flink can ensure the idempotence in this
> way.
> > To guarantee the output result is as expected, it’s recommended to define
> > primary key for the table and make sure the primary key is one of the
> > unique key sets or primary key of the underlying database table. In
> append
> > mode, Flink will interpret all records as INSERT messages, the INSERT
> > operation may fail if a primary key or unique constraint violation
> happens
> > in the underlying database.
> >
> > See CREATE TABLE DDL
> > <
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
> > >
> > for
> > more details about PRIMARY KEY syntax.
> >
> >
> > 这里也有一点,In append mode, Flink will interpret all records as INSERT
> messages,
> > the INSERT operation may fail if a primary key or unique constraint
> > violation happens in the underlying database.  什么叫append
> > mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
> >
> > 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
> >
> >
> >
> > 赵一旦  于2020年11月23日周一 下午3:02写道:
> >
> > > 补充sql:
> > >
> > > DDL:
> > >
> > > CREATE TABLE flink_recent_pv_subid
> > > (
> > > `supply_id` STRING,
> > > `subid` STRING,
> > > `mark`  STRING,
> > > `time`  STRING,
> > > `pv`BIGINT,
> > > PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT ENFORCED
> > > ) WITH (
> > >   'connector.type' = 'jdbc',
> > >
> > >   ..
> > >
> > > );
> > >
> > >
> > > 查询SQL:
> > >
> > > INSERT INTO
> > > flink_recent_pv_subid
> > > SELECT
> > > `sid`,
> > > `subid`,
> > > `mark`,
> > > DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
> > 'MMddHHmm') as `time`,
> > > count(1) AS `pv`
> > > FROM baidu_log_view
> > > GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5'
> MINUTE);
> > >
> > >
> > > 赵一旦  于2020年11月23日周一 下午3:00写道:
> > >
> > >> @hailongwang 一样的。
> > >>
> > >> 有个情况说明下,我是tumble window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
> > >> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
> > >> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。
> > >>
> > >>
> > >>
> > >>
> > >> hailongwang <18868816...@163.com> 于2020年11月23日周一 下午2:39写道:
> > >>
> > >>> 数据库中主键的设置跟 primary key 定义的一样不?
> > >>>
> > >>>
> > >>> Best,
> > >>> Hailong
> > >>> 在 2020-11-23 13:15:01,"赵一旦"  写道:
> > >>> >如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert on
> > >>> >duplicate方式写入。
> > >>> >
> > >>> >但我在使用中,发现报了 duplicate entry的错误。例如:
> > >>> >Caused by: com.mysql.jdbc.exceptions.jdbc4.
> > >>> >MySQLIntegrityConstraintViolationException: Duplicate entry
> > >>> >'2036-feed_landing_box_news-2000-202011231405' for key 'uniq_ssmt'
> > >>> >at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> > >>> Method)
> > >>> >at sun.reflect.NativeConstructorAccessorImpl.newInstance(
> > >>> >NativeConstructorAccessorImpl.java:62)
> > >>> >at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> > >>> >DelegatingConstructorAccessorImpl.java:45)
> > >>> >at
> java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> > >>> >at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
> > >>> >at com.mysql.jdbc.Util.getInstance(Util.java:386)
> > >>> >at
> com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
> > >>> >at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
> > >>> >at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
> > >>> >at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
> > >>> >at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
> > >>> >at
> com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818)
> > >>> >at
> > >>> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement
> > >>> >.java:2157)
> > >>> >at
> > com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> > >>> >.java:2460)
> > >>> >at
> > com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> > >>> >.java:2377)
> > >>> >at
> > com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> 

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-22 文章 Jark Wu
请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。

On Mon, 23 Nov 2020 at 15:21, 赵一旦  wrote:

> 如下是Flink官方文档JBDC connector的部分内容。Key handling
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> >
>
> Flink uses the primary key that defined in DDL when writing data to
> external databases. The connector operate in upsert mode if the primary key
> was defined, otherwise, the connector operate in append mode.
>
> In upsert mode, Flink will insert a new row or update the existing row
> according to the primary key, Flink can ensure the idempotence in this way.
> To guarantee the output result is as expected, it’s recommended to define
> primary key for the table and make sure the primary key is one of the
> unique key sets or primary key of the underlying database table. In append
> mode, Flink will interpret all records as INSERT messages, the INSERT
> operation may fail if a primary key or unique constraint violation happens
> in the underlying database.
>
> See CREATE TABLE DDL
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
> >
> for
> more details about PRIMARY KEY syntax.
>
>
> 这里也有一点,In append mode, Flink will interpret all records as INSERT messages,
> the INSERT operation may fail if a primary key or unique constraint
> violation happens in the underlying database.  什么叫append
> mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
>
> 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
>
>
>
> 赵一旦  于2020年11月23日周一 下午3:02写道:
>
> > 补充sql:
> >
> > DDL:
> >
> > CREATE TABLE flink_recent_pv_subid
> > (
> > `supply_id` STRING,
> > `subid` STRING,
> > `mark`  STRING,
> > `time`  STRING,
> > `pv`BIGINT,
> > PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT ENFORCED
> > ) WITH (
> >   'connector.type' = 'jdbc',
> >
> >   ..
> >
> > );
> >
> >
> > 查询SQL:
> >
> > INSERT INTO
> > flink_recent_pv_subid
> > SELECT
> > `sid`,
> > `subid`,
> > `mark`,
> > DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
> 'MMddHHmm') as `time`,
> > count(1) AS `pv`
> > FROM baidu_log_view
> > GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5' MINUTE);
> >
> >
> > 赵一旦  于2020年11月23日周一 下午3:00写道:
> >
> >> @hailongwang 一样的。
> >>
> >> 有个情况说明下,我是tumble window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
> >> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
> >> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。
> >>
> >>
> >>
> >>
> >> hailongwang <18868816...@163.com> 于2020年11月23日周一 下午2:39写道:
> >>
> >>> 数据库中主键的设置跟 primary key 定义的一样不?
> >>>
> >>>
> >>> Best,
> >>> Hailong
> >>> 在 2020-11-23 13:15:01,"赵一旦"  写道:
> >>> >如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert on
> >>> >duplicate方式写入。
> >>> >
> >>> >但我在使用中,发现报了 duplicate entry的错误。例如:
> >>> >Caused by: com.mysql.jdbc.exceptions.jdbc4.
> >>> >MySQLIntegrityConstraintViolationException: Duplicate entry
> >>> >'2036-feed_landing_box_news-2000-202011231405' for key 'uniq_ssmt'
> >>> >at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> >>> Method)
> >>> >at sun.reflect.NativeConstructorAccessorImpl.newInstance(
> >>> >NativeConstructorAccessorImpl.java:62)
> >>> >at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> >>> >DelegatingConstructorAccessorImpl.java:45)
> >>> >at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >>> >at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
> >>> >at com.mysql.jdbc.Util.getInstance(Util.java:386)
> >>> >at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
> >>> >at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
> >>> >at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
> >>> >at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
> >>> >at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
> >>> >at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818)
> >>> >at
> >>> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement
> >>> >.java:2157)
> >>> >at
> com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> >>> >.java:2460)
> >>> >at
> com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> >>> >.java:2377)
> >>> >at
> com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> >>> >.java:2361)
> >>> >at com.mysql.jdbc.PreparedStatement.executeBatchedInserts(
> >>> >PreparedStatement.java:1793)
> >>> >
> >>> >(2)
> >>> >此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。
> >>> >但这个冲突的entry是在14.11分那一波才报错的。
> >>>
> >>
>


Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-22 文章 赵一旦
如下是Flink官方文档JBDC connector的部分内容。Key handling


Flink uses the primary key that defined in DDL when writing data to
external databases. The connector operate in upsert mode if the primary key
was defined, otherwise, the connector operate in append mode.

In upsert mode, Flink will insert a new row or update the existing row
according to the primary key, Flink can ensure the idempotence in this way.
To guarantee the output result is as expected, it’s recommended to define
primary key for the table and make sure the primary key is one of the
unique key sets or primary key of the underlying database table. In append
mode, Flink will interpret all records as INSERT messages, the INSERT
operation may fail if a primary key or unique constraint violation happens
in the underlying database.

See CREATE TABLE DDL

for
more details about PRIMARY KEY syntax.


这里也有一点,In append mode, Flink will interpret all records as INSERT messages,
the INSERT operation may fail if a primary key or unique constraint
violation happens in the underlying database.  什么叫append
mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?

1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。



赵一旦  于2020年11月23日周一 下午3:02写道:

> 补充sql:
>
> DDL:
>
> CREATE TABLE flink_recent_pv_subid
> (
> `supply_id` STRING,
> `subid` STRING,
> `mark`  STRING,
> `time`  STRING,
> `pv`BIGINT,
> PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT ENFORCED
> ) WITH (
>   'connector.type' = 'jdbc',
>
>   ..
>
> );
>
>
> 查询SQL:
>
> INSERT INTO
> flink_recent_pv_subid
> SELECT
> `sid`,
> `subid`,
> `mark`,
> DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 
> 'MMddHHmm') as `time`,
> count(1) AS `pv`
> FROM baidu_log_view
> GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5' MINUTE);
>
>
> 赵一旦  于2020年11月23日周一 下午3:00写道:
>
>> @hailongwang 一样的。
>>
>> 有个情况说明下,我是tumble window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
>> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
>> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。
>>
>>
>>
>>
>> hailongwang <18868816...@163.com> 于2020年11月23日周一 下午2:39写道:
>>
>>> 数据库中主键的设置跟 primary key 定义的一样不?
>>>
>>>
>>> Best,
>>> Hailong
>>> 在 2020-11-23 13:15:01,"赵一旦"  写道:
>>> >如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert on
>>> >duplicate方式写入。
>>> >
>>> >但我在使用中,发现报了 duplicate entry的错误。例如:
>>> >Caused by: com.mysql.jdbc.exceptions.jdbc4.
>>> >MySQLIntegrityConstraintViolationException: Duplicate entry
>>> >'2036-feed_landing_box_news-2000-202011231405' for key 'uniq_ssmt'
>>> >at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>> Method)
>>> >at sun.reflect.NativeConstructorAccessorImpl.newInstance(
>>> >NativeConstructorAccessorImpl.java:62)
>>> >at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
>>> >DelegatingConstructorAccessorImpl.java:45)
>>> >at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>> >at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
>>> >at com.mysql.jdbc.Util.getInstance(Util.java:386)
>>> >at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
>>> >at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
>>> >at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
>>> >at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
>>> >at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
>>> >at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818)
>>> >at
>>> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement
>>> >.java:2157)
>>> >at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>>> >.java:2460)
>>> >at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>>> >.java:2377)
>>> >at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>>> >.java:2361)
>>> >at com.mysql.jdbc.PreparedStatement.executeBatchedInserts(
>>> >PreparedStatement.java:1793)
>>> >
>>> >(2)
>>> >此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。
>>> >但这个冲突的entry是在14.11分那一波才报错的。
>>>
>>


Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-22 文章 赵一旦
补充sql:

DDL:

CREATE TABLE flink_recent_pv_subid
(
`supply_id` STRING,
`subid` STRING,
`mark`  STRING,
`time`  STRING,
`pv`BIGINT,
PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT ENFORCED
) WITH (
  'connector.type' = 'jdbc',

  ..

);


查询SQL:

INSERT INTO
flink_recent_pv_subid
SELECT
`sid`,
`subid`,
`mark`,
DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
'MMddHHmm') as `time`,
count(1) AS `pv`
FROM baidu_log_view
GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5' MINUTE);


赵一旦  于2020年11月23日周一 下午3:00写道:

> @hailongwang 一样的。
>
> 有个情况说明下,我是tumble window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。
>
>
>
>
> hailongwang <18868816...@163.com> 于2020年11月23日周一 下午2:39写道:
>
>> 数据库中主键的设置跟 primary key 定义的一样不?
>>
>>
>> Best,
>> Hailong
>> 在 2020-11-23 13:15:01,"赵一旦"  写道:
>> >如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert on
>> >duplicate方式写入。
>> >
>> >但我在使用中,发现报了 duplicate entry的错误。例如:
>> >Caused by: com.mysql.jdbc.exceptions.jdbc4.
>> >MySQLIntegrityConstraintViolationException: Duplicate entry
>> >'2036-feed_landing_box_news-2000-202011231405' for key 'uniq_ssmt'
>> >at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>> >at sun.reflect.NativeConstructorAccessorImpl.newInstance(
>> >NativeConstructorAccessorImpl.java:62)
>> >at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
>> >DelegatingConstructorAccessorImpl.java:45)
>> >at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> >at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
>> >at com.mysql.jdbc.Util.getInstance(Util.java:386)
>> >at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
>> >at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
>> >at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
>> >at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
>> >at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
>> >at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818)
>> >at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement
>> >.java:2157)
>> >at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>> >.java:2460)
>> >at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>> >.java:2377)
>> >at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>> >.java:2361)
>> >at com.mysql.jdbc.PreparedStatement.executeBatchedInserts(
>> >PreparedStatement.java:1793)
>> >
>> >(2)
>> >此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。
>> >但这个冲突的entry是在14.11分那一波才报错的。
>>
>


Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-22 文章 赵一旦
@hailongwang 一样的。

有个情况说明下,我是tumble window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。




hailongwang <18868816...@163.com> 于2020年11月23日周一 下午2:39写道:

> 数据库中主键的设置跟 primary key 定义的一样不?
>
>
> Best,
> Hailong
> 在 2020-11-23 13:15:01,"赵一旦"  写道:
> >如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert on
> >duplicate方式写入。
> >
> >但我在使用中,发现报了 duplicate entry的错误。例如:
> >Caused by: com.mysql.jdbc.exceptions.jdbc4.
> >MySQLIntegrityConstraintViolationException: Duplicate entry
> >'2036-feed_landing_box_news-2000-202011231405' for key 'uniq_ssmt'
> >at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> >at sun.reflect.NativeConstructorAccessorImpl.newInstance(
> >NativeConstructorAccessorImpl.java:62)
> >at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> >DelegatingConstructorAccessorImpl.java:45)
> >at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
> >at com.mysql.jdbc.Util.getInstance(Util.java:386)
> >at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
> >at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
> >at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
> >at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
> >at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
> >at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818)
> >at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement
> >.java:2157)
> >at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> >.java:2460)
> >at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> >.java:2377)
> >at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> >.java:2361)
> >at com.mysql.jdbc.PreparedStatement.executeBatchedInserts(
> >PreparedStatement.java:1793)
> >
> >(2)
> >此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。
> >但这个冲突的entry是在14.11分那一波才报错的。
>


Re: TUMBLE函数不支持 回撤流

2020-11-22 文章 赵一旦
@LakeShen。 怎么看是append/retract数据流呢?是通过逻辑自己判定还是说有什么flink层面的信息直接反映。

LakeShen  于2020年11月4日周三 上午10:12写道:

> Hi 夜思流年梦,
>
> 看下你的 dwd_XXX 这张表的类型,是 append 数据流,还是 retract 数据流。
> 如果是 retract ,应该就不能再上面进行窗口计算了。
>
> Best,
> LakeShen
>
> 史 正超  于2020年11月3日周二 下午6:34写道:
>
> > canal-json 的format也是会有delete 和update的数据的,同样changelog-json也是。他们的都支持 INSERT
> > UPDATE DELETE, 相关代码如下:
> >
> > @Override
> > public ChangelogMode getChangelogMode() {
> >return ChangelogMode.newBuilder()
> >   .addContainedKind(RowKind.INSERT)
> >   .addContainedKind(RowKind.UPDATE_BEFORE)
> >   .addContainedKind(RowKind.UPDATE_AFTER)
> >   .addContainedKind(RowKind.DELETE)
> >   .build();
> > }
> >
> > 所以在window里消费带有update和delete的数据现在应该是不支持的。
> > 
> > 发件人: 夜思流年梦 
> > 发送时间: 2020年11月3日 9:46
> > 收件人: user-zh@flink.apache.org 
> > 主题: TUMBLE函数不支持 回撤流
> >
> >
> >
> >
> > 这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性;
> >
> >
> >
> >
> >
> >
> >
> > 原sql
> >
> > select 0 as id
> >
> > , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime
> >
> > ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
> > then memberid else NULL end) as paynum_h
> >
> > ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP,
> > '-MM-dd')  then real_product else 0 end)) as paymoney_h
> >
> > from dwd_XXX
> >
> > where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
> >
> > group by TUMBLE(proctime ,interval '1' HOUR);
> >
> >
> > 报错:
> >  org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't
> > support consuming update and delete changes which is produced by node
> > TableSourceScan
> > 发现把kafka建表语句改成 json格式就可以
> >
> >
> > 数据源不是flink-mysql-cdc得来的
> >
> >
> > 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),
> >
> >
> >  'connector' = 'kafka',
> >   'properties.group.id' = 'XX',
> >   'properties.bootstrap.servers' = 'XX',
> >   'topic' = 'ODS_XXX',
> >   'scan.startup.mode' = 'group-offsets',
> >   'format' = 'canal-json');
> >
> >
> > 上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
> > 建kafka表的格式,使用的changelog-json:
> >
> >
> > WITH (
> >   'connector' = 'kafka',
> >   'properties.group.id' = 'XX',
> >   'properties.bootstrap.servers' = 'XXX',
> >   'topic' = 'DWD_XXX',
> >   'scan.startup.mode' = 'group-offsets',
> >   'format' = 'changelog-json');
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-10-30 14:53:09,"admin" <17626017...@163.com> 写道:
> > >Hi,
> > >能贴一下完整的sql吗,数据源是CDC的数据吗?
> > >
> > >> 2020年10月30日 下午2:48,夜思流年梦  写道:
> > >>
> > >> 开发者你好:
> > >> 现有此场景:
> > >> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
> > >> select
> > >>
> > >>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
> > >>
> > >>> ,sum(amt) as paymoney_h
> > >>
> > >>> from 
> > >>
> > >>> group by TUMBLE(write_time,interval '1' HOUR);
> > >>
> > >>
> > >> 报错:
> > >> org.apache.flink.table.api.TableException: GroupWindowAggregate
> doesn't
> > support consuming update and delete changes which is produced by node
> > TableSourceScan
> > >>
> > >>
> > >>
> > >>
> > >> 发现把kafka建表语句改成 json格式就可以
> > >>
> > >>
> > >> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> >
> >
> >
> >
> >
> >
> >
>


答复: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题

2020-11-22 文章 sherlock zw
多谢指点,试了下,返回 Tuple 类型作为 key 是可以按多个字段进行分组的,拼接成 String 的话应该也是可以的
final SingleOutputStreamOperator> sum = flatMap
.keyBy(new KeySelector, 
Tuple2>() {
@Override
public Tuple2 getKey(Tuple3 tuple3) throws Exception {
return Tuple2.of(tuple3.f0, tuple3.f1);
}
})
.sum(2);

-邮件原件-
发件人: 赵一旦  
发送时间: 2020年11月23日 14:35
收件人: user-zh@flink.apache.org
主题: Re: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题

(1)返回字符串,自己拼接就可以。
(2)返回Tuple类型作为Key。
1.10到1.11相当于是去除了多key的辅助keyBy方法,本身内部就是组成tuple。原因不清楚。

sherlock zw  于2020年11月20日周五 上午11:18写道:

> 我看了现在的 flink 1.11 的 keyBy 的代码,是使用的KeySelector
> key,但每次只能返回一个字段,不支持返回多个字段,也就说明了一次只能按一个字段去分组(PS: test.keyBy(t ->
> t.f0)),如果我想按多个字段进行分组的话该怎么操作呢?
>
> -邮件原件-
> 发件人: guanxianchun 
> 发送时间: 2020年11月19日 20:53
> 收件人: user-zh@flink.apache.org
> 主题: Re: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题
>
> flink-1.11使用KeySelector
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re:JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-22 文章 hailongwang
数据库中主键的设置跟 primary key 定义的一样不?


Best,
Hailong
在 2020-11-23 13:15:01,"赵一旦"  写道:
>如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert on
>duplicate方式写入。
>
>但我在使用中,发现报了 duplicate entry的错误。例如:
>Caused by: com.mysql.jdbc.exceptions.jdbc4.
>MySQLIntegrityConstraintViolationException: Duplicate entry
>'2036-feed_landing_box_news-2000-202011231405' for key 'uniq_ssmt'
>at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>at sun.reflect.NativeConstructorAccessorImpl.newInstance(
>NativeConstructorAccessorImpl.java:62)
>at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
>DelegatingConstructorAccessorImpl.java:45)
>at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
>at com.mysql.jdbc.Util.getInstance(Util.java:386)
>at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
>at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
>at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
>at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
>at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
>at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818)
>at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement
>.java:2157)
>at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>.java:2460)
>at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>.java:2377)
>at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>.java:2361)
>at com.mysql.jdbc.PreparedStatement.executeBatchedInserts(
>PreparedStatement.java:1793)
>
>(2)
>此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。
>但这个冲突的entry是在14.11分那一波才报错的。


Re: 求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

2020-11-22 文章 赵一旦
connect前生成watermark也是可以的应该,但是你需要把ruleConfigSource流也赋watermark。我猜是这个地方出问题了。

huang botao  于2020年11月19日周四 下午12:58写道:

> hi, zhisheng, hailongwang:
>
> 感谢对这个问题的解答,这个问题确实出在了window无法触发的地方,原因是 在connect()
> 后面没有定义watermar导致,在connect后指定watermark就可以触发window了。
>
>
>
> On Wed, Nov 18, 2020 at 10:46 PM zhisheng  wrote:
>
> > 可以检查一下作业消费的 kafka 分区是否都有数据,如果有的分区无数据的话,那么可能会导致水印不会更新,从而窗口触发不了。
> >
> > Best
> > zhisheng
> >
> > huang botao  于2020年11月18日周三 下午10:34写道:
> >
> > > 感谢您的回复,是这样的,我这边的环境设置用的是eventTime
> > >
> > > StreamExecutionEnvironment env =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> > >
> > > window设置的是 slid(2,1), 但是等了足够长的时间还是不能触发 apply 方法
> > >
> > >
> > > On Wed, Nov 18, 2020 at 5:50 PM hailongwang <18868816...@163.com>
> wrote:
> > >
> > > > 应该是 window 还没达到触发的条件,可以看下 watermark 是否在推进
> > > >
> > > > 在 2020-11-18 15:29:54,"huang botao"  写道:
> > > > >Hi ,请教一个奇怪的问题:
> > > > >
> > > > >streamSource.flatMap(new ComeIntoMaxFlatMapFunction())
> > > > >
> > > > >.assignTimestampsAndWatermarks(new
> > > > >CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds()))
> > > > >
> > > > >.connect(ruleConfigSource)
> > > > >.process(new MetricDataFilterProcessFunction())
> > > > >.keyBy((KeySelector) metric -> {
> > > > >MetricDataKey metricDataKey = new MetricDataKey();
> > > > >metricDataKey.setDomain(metric.getDomain());
> > > > >metricDataKey.setStationAliasCode(metric.getStaId());
> > > > >metricDataKey.setEquipMK(metric.getEquipMK());
> > > > >metricDataKey.setEquipID(metric.getEquipID());
> > > > >metricDataKey.setMetric(metric.getMetric());
> > > > >return metricDataKey;
> > > > >})
> > > > >
> > > > >.window(SlidingEventTimeWindows.of(Time.seconds(2),
> > > Time.seconds(1)))
> > > > >.apply(new RichWindowFunction > > > >MetricDataKey, TimeWindow>() {
> > > > >@Override
> > > > >public void apply(MetricDataKey tuple, TimeWindow window,
> > > > >Iterable input, Collector out) throws
> > > > >Exception {
> > > > >input.forEach(x->{
> > > > >System.out.println("--->>>"+x);
> > > > >});
> > > > >}
> > > > >})
> > > > >
> > > > >我定义这个Topology中能正常执行keyBy,但是无法执行apply中的
> System.out.println("--->>>"+x);
> > > > >
> > > > >
> > > > >数据一直在消费着,没有任何报错信息
> > > >
> > >
> >
>


Re: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题

2020-11-22 文章 赵一旦
(1)返回字符串,自己拼接就可以。
(2)返回Tuple类型作为Key。
1.10到1.11相当于是去除了多key的辅助keyBy方法,本身内部就是组成tuple。原因不清楚。

sherlock zw  于2020年11月20日周五 上午11:18写道:

> 我看了现在的 flink 1.11 的 keyBy 的代码,是使用的KeySelector
> key,但每次只能返回一个字段,不支持返回多个字段,也就说明了一次只能按一个字段去分组(PS: test.keyBy(t ->
> t.f0)),如果我想按多个字段进行分组的话该怎么操作呢?
>
> -邮件原件-
> 发件人: guanxianchun 
> 发送时间: 2020年11月19日 20:53
> 收件人: user-zh@flink.apache.org
> 主题: Re: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题
>
> flink-1.11使用KeySelector
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-22 文章 赵一旦
如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert on
duplicate方式写入。

但我在使用中,发现报了 duplicate entry的错误。例如:
Caused by: com.mysql.jdbc.exceptions.jdbc4.
MySQLIntegrityConstraintViolationException: Duplicate entry
'2036-feed_landing_box_news-2000-202011231405' for key 'uniq_ssmt'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(
NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
at com.mysql.jdbc.Util.getInstance(Util.java:386)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement
.java:2157)
at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
.java:2460)
at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
.java:2377)
at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
.java:2361)
at com.mysql.jdbc.PreparedStatement.executeBatchedInserts(
PreparedStatement.java:1793)

(2)
此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。
但这个冲突的entry是在14.11分那一波才报错的。


Re: FlinkSQL CDC 窗口分组聚合求助

2020-11-22 文章 Jark Wu
那是不是用非窗口聚合,开5s mini batch,是不是可以达到你的需求?

Best,
Jark

On Mon, 23 Nov 2020 at 13:16, jy l  wrote:

> 使用场景就是我们想用Flink 根据订单系统中的订单表,每5s钟计算一次总的交易金额。
> 目前我们的系统大致架构是mysql(debezium)>kafka--->flink>es
>
> Jark Wu  于2020年11月23日周一 上午10:35写道:
>
> > Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。
> > 你可以使用非 window 聚合来代替。
> >
> > Btw,你可能说一下你的需求场景么? 为什么需要在  CDC 上做 window 操作呢?
> >
> > Best,
> > Jark
> >
> > On Mon, 23 Nov 2020 at 10:28, jy l  wrote:
> >
> > > Hi:
> > > 我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下:
> > > [image: image.png]
> > > [image: image.png]
> > > 分组计算的SQL如下:
> > > [image: image.png]
> > > 在执行计算时,报了如下异常:
> > > Exception in thread "main" org.apache.flink.table.api.TableException:
> > > GroupWindowAggregate doesn't support consuming update and delete
> changes
> > > which is produced by node TableSourceScan(table=[[default_catalog,
> > > default_database, t_order,
> watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1,
> > > 1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps,
> > > orderInformationId, userId, categoryId, productId, price, productCount,
> > > priceSum, shipAddress, receiverAddress])
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> > > at
> > >
> >
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> > > at scala.collection.immutable.Range.foreach(Range.scala:155)
> > > at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> > > at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> > > at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> > > at
> > >
> >
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> > >
> > > 我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。
> > > 那面对我这样的情况,该用什么方案来解决?
> > > 望知道的各位告知一下,感谢!
> > >
> > > 祝好
> > >
> > >
> >
>


Re: Flink Standalone HA问题

2020-11-22 文章 赵一旦
没看懂逻辑,flink的HA和hadoop的HA啥关系。
如果hadoop出现问题,对于flink来说hadoop是依赖,这会导致ckpt失败感觉,但和flink集群自身没关系,flink集群应该不会失败。

Fei Han  于2020年11月21日周六 上午11:51写道:

> @all!
> Flink版本是1.10.2。集群模式是Flink Standalone HA。
> 问题:
> 如果在hadoop HA 的情况下,两个namenode都宕机了。重启机器后,启动hadoop。
> Flink  Standalone HA 观察后,选举5分钟后才成功?请问是不是正常的?
>
>
>


Re: flink zeppelin的type参数(append/update/single)和flink的动态表有关系嘛

2020-11-22 文章 Jeff Zhang
和 flink 没关系,是zeppelin自己定义的参数,只影响select 语句,对于zeppelin的数据可视化有影响,不影响flink job

赵一旦  于2020年11月23日周一 下午1:11写道:

> 如题,这个参数是仅仅zeppelin自身的参数,用于决定如何展示数据之类的逻辑呢?
> 还是和flink任务也有关系。按照
> https://www.yuque.com/jeffzhangjianfeng/gldg8w/te2l1c#AzSOu
> 说法,append模式第一个结果列必须是时间,所以看起来更像是zeppelin自身的要求。
>
> 我看了下append方式执行,jdbc仍然使用的upsertSink。
>
> 所以谁确认下这个参数是不是和具体任务没啥关系。
>


-- 
Best Regards

Jeff Zhang


Re: FlinkSQL CDC 窗口分组聚合求助

2020-11-22 文章 jy l
使用场景就是我们想用Flink 根据订单系统中的订单表,每5s钟计算一次总的交易金额。
目前我们的系统大致架构是mysql(debezium)>kafka--->flink>es

Jark Wu  于2020年11月23日周一 上午10:35写道:

> Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。
> 你可以使用非 window 聚合来代替。
>
> Btw,你可能说一下你的需求场景么? 为什么需要在  CDC 上做 window 操作呢?
>
> Best,
> Jark
>
> On Mon, 23 Nov 2020 at 10:28, jy l  wrote:
>
> > Hi:
> > 我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下:
> > [image: image.png]
> > [image: image.png]
> > 分组计算的SQL如下:
> > [image: image.png]
> > 在执行计算时,报了如下异常:
> > Exception in thread "main" org.apache.flink.table.api.TableException:
> > GroupWindowAggregate doesn't support consuming update and delete changes
> > which is produced by node TableSourceScan(table=[[default_catalog,
> > default_database, t_order, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1,
> > 1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps,
> > orderInformationId, userId, categoryId, productId, price, productCount,
> > priceSum, shipAddress, receiverAddress])
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> > at
> >
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> > at scala.collection.immutable.Range.foreach(Range.scala:155)
> > at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> > at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> > at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> > at
> >
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> >
> > 我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。
> > 那面对我这样的情况,该用什么方案来解决?
> > 望知道的各位告知一下,感谢!
> >
> > 祝好
> >
> >
>


flink zeppelin的type参数(append/update/single)和flink的动态表有关系嘛

2020-11-22 文章 赵一旦
如题,这个参数是仅仅zeppelin自身的参数,用于决定如何展示数据之类的逻辑呢?
还是和flink任务也有关系。按照https://www.yuque.com/jeffzhangjianfeng/gldg8w/te2l1c#AzSOu
说法,append模式第一个结果列必须是时间,所以看起来更像是zeppelin自身的要求。

我看了下append方式执行,jdbc仍然使用的upsertSink。

所以谁确认下这个参数是不是和具体任务没啥关系。


Re: Flink任务启动偶尔报错PartitionNotFoundException,会自动恢复。

2020-11-22 文章 赵一旦
这个报错和kafka没有关系的哈,我大概理解是提交任务的瞬间,jobManager/taskManager机器压力较大,存在机器之间心跳超时什么的?
这个partition应该是指flink运行图中的数据partition,我感觉。没有具体细看,每次提交的瞬间可能遇到这个问题,然后会自动重试成功。

zhisheng  于2020年11月18日周三 下午10:51写道:

> 是不是有 kafka 机器挂了?
>
> Best
> zhisheng
>
> hailongwang <18868816...@163.com> 于2020年11月18日周三 下午5:56写道:
>
> > 感觉还有其它 root cause,可以看下还有其它日志不?
> >
> >
> > Best,
> > Hailong
> >
> > At 2020-11-18 15:52:57, "赵一旦"  wrote:
> > >2020-11-18 16:51:37
> > >org.apache.flink.runtime.io
> .network.partition.PartitionNotFoundException:
> > >Partition
> > b225fa9143dfa179d3a3bd223165d5c5#3@3fee4d51f5a43001ef743f3f15e4cfb2
> > >not found.
> > >at org.apache.flink.runtime.io.network.partition.consumer.
> > >RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:267)
> > >at org.apache.flink.runtime.io.network.partition.consumer.
> >
> >
> >RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:166)
> > >at org.apache.flink.runtime.io.network.partition.consumer.
> > >SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:521)
> > >at org.apache.flink.runtime.io.network.partition.consumer.
> >
> >
> >SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:765
> > >)
> > >at
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture
> > >.java:670)
> > >at java.util.concurrent.CompletableFuture$UniAccept.tryFire(
> > >CompletableFuture.java:646)
> > >at java.util.concurrent.CompletableFuture$Completion.run(
> > >CompletableFuture.java:456)
> > >at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> > >at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> > >ForkJoinExecutorConfigurator.scala:44)
> > >at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > >at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
> > >.java:1339)
> > >at
> > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > >at
> > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread
> > >.java:107)
> > >
> > >
> > >请问这是什么问题呢?
> >
>


Re: 请教这种数据格式怎么配置event time呢

2020-11-22 文章 赵一旦
已解决。COALESCE函数调用那的类型问题。d['server_time']是String类型,后边0是数字。

Jark Wu  于2020年11月17日周二 下午11:48写道:

> 报什么错?
>
>
> On Tue, 17 Nov 2020 at 23:43, 赵一旦  wrote:
>
> > CREATE TABLE user_log
> > (
> > d MAP,
> > process_time AS PROCTIME(),
> > event_time AS
> > TO_TIMESTAMP(FROM_UNIXTIME(COALESCE(d['server_time'], 0) / 1000)),
> > WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
> > ) WITH (
> > 'connector' = 'kafka',
> >
> >  ...
> >
> > );
> >
> > 如上,报错。貌似不支持这么玩。但是我的数据格式就是这样的,比如:
> >
> > {
> >
> >   "d": {
> >
> > "name": "abc",
> >
> > "age": 12
> >
> >   }
> >
> > }
> >
>


Re: FlinkSQL CDC 窗口分组聚合求助

2020-11-22 文章 Jark Wu
我建了个 issue 跟进这个功能:https://issues.apache.org/jira/browse/FLINK-20281

On Mon, 23 Nov 2020 at 10:35, Jark Wu  wrote:

> Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。
> 你可以使用非 window 聚合来代替。
>
> Btw,你可能说一下你的需求场景么? 为什么需要在  CDC 上做 window 操作呢?
>
> Best,
> Jark
>
> On Mon, 23 Nov 2020 at 10:28, jy l  wrote:
>
>> Hi:
>> 我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下:
>> [image: image.png]
>> [image: image.png]
>> 分组计算的SQL如下:
>> [image: image.png]
>> 在执行计算时,报了如下异常:
>> Exception in thread "main" org.apache.flink.table.api.TableException:
>> GroupWindowAggregate doesn't support consuming update and delete changes
>> which is produced by node TableSourceScan(table=[[default_catalog,
>> default_database, t_order, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1,
>> 1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps,
>> orderInformationId, userId, categoryId, productId, price, productCount,
>> priceSum, shipAddress, receiverAddress])
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
>> at
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>> at scala.collection.immutable.Range.foreach(Range.scala:155)
>> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
>> at
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>>
>> 我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。
>> 那面对我这样的情况,该用什么方案来解决?
>> 望知道的各位告知一下,感谢!
>>
>> 祝好
>>
>>


Re: FlinkSQL CDC 窗口分组聚合求助

2020-11-22 文章 Jark Wu
Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。
你可以使用非 window 聚合来代替。

Btw,你可能说一下你的需求场景么? 为什么需要在  CDC 上做 window 操作呢?

Best,
Jark

On Mon, 23 Nov 2020 at 10:28, jy l  wrote:

> Hi:
> 我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下:
> [image: image.png]
> [image: image.png]
> 分组计算的SQL如下:
> [image: image.png]
> 在执行计算时,报了如下异常:
> Exception in thread "main" org.apache.flink.table.api.TableException:
> GroupWindowAggregate doesn't support consuming update and delete changes
> which is produced by node TableSourceScan(table=[[default_catalog,
> default_database, t_order, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1,
> 1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps,
> orderInformationId, userId, categoryId, productId, price, productCount,
> priceSum, shipAddress, receiverAddress])
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> at scala.collection.immutable.Range.foreach(Range.scala:155)
> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>
> 我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。
> 那面对我这样的情况,该用什么方案来解决?
> 望知道的各位告知一下,感谢!
>
> 祝好
>
>


maven overlapp 情况下shade选择

2020-11-22 文章 赵一旦
如题,如果出现overlap,shade的选择策略有人清楚嘛。

如果我依赖某个包,重写覆盖部分类(我知道的是我的类优先级更高),但shade的时候是否也是我的类优先级更高。


FlinkSQL CDC 窗口分组聚合求助

2020-11-22 文章 jy l
Hi:
我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下:
[image: image.png]
[image: image.png]
分组计算的SQL如下:
[image: image.png]
在执行计算时,报了如下异常:
Exception in thread "main" org.apache.flink.table.api.TableException:
GroupWindowAggregate doesn't support consuming update and delete changes
which is produced by node TableSourceScan(table=[[default_catalog,
default_database, t_order, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1,
1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps,
orderInformationId, userId, categoryId, productId, price, productCount,
priceSum, shipAddress, receiverAddress])
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.immutable.Range.foreach(Range.scala:155)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)

我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。
那面对我这样的情况,该用什么方案来解决?
望知道的各位告知一下,感谢!

祝好


加载外部配置文件

2020-11-22 文章 范未太
哪位大佬用过typeSafe config,flink 如何把外部.conf 配置文件,在main函数中,通过全局配置的方式,传递给function 
在open函数里调用


| |
范未太
|
|
邮箱:fwt_...@163.com
|

签名由 网易邮箱大师 定制

flink 1.11.2 SQL Client self inner join 结果不正确

2020-11-22 文章 段晓雄
我用相同的一个表自己 inner join 自己,取不同时间点,得到相同值 

sql是这样,p5m 和 p0m 都是 snmpprobe.p_snmp_ifxtable 表,时间不同 
select 
p0m.coltime, p0m.ifhcinoctets a, p0m.ifhcoutoctets c, 
p5m.coltime, p5m.ifhcinoctets b, p5m.ifhcoutoctets d 
from snmpprobe.p_snmp_ifxtable as p0m 
inner join snmpprobe.p_snmp_ifxtable as p5m on p0m.id=p5m.id and 
p0m.mibindex=p5m.mibindex 
where p5m.dt='2020-11-23' and p5m.hh='01' and p5m.mi='00' and p5m.mibindex=4 
and p5m.ip='172.31.28.4' 
and p0m.dt='2020-11-23' and p0m.hh='00' and p0m.mi='55'; 

用flink sql client执行,计算结果是 
coltime,a,c,coltime0, b,d 
2020-11-23T01:00 ,3702300836,5541513669,2020-11-23T01:00,3702300836,5541513669 
这里 coltime= coltime0,都是2020-11-23T01:00, 同时 a=b,c=d 


hive 行命令查询结果是 
2020-11-23 00:55:00.0,3702187169,5541332531,2020-11-23 
01:00:00.0,3702300836,5541513669 
coltime=2020-11-23 00:55:00.0 , coltime0=2020-11-23 
01:00:00.0,a!=c, b!=d 

flink 结果明显不正确,flink sql 的self join 需要什么特殊写法吗? 



flink 1.11.2 SQL Client self inner join 结果不正确

2020-11-22 文章 macdoor
我用相同的一个表自己 inner join 自己,取不同时间点,得到相同值 

sql是这样,p5m 和 p0m 都是 snmpprobe.p_snmp_ifxtable 表,时间不同 
select 
p0m.coltime, p0m.ifhcinoctets a, p0m.ifhcoutoctets c, 
p5m.coltime, p5m.ifhcinoctets b, p5m.ifhcoutoctets d 
from snmpprobe.p_snmp_ifxtable as p0m 
inner join snmpprobe.p_snmp_ifxtable as p5m on p0m.id=p5m.id and
p0m.mibindex=p5m.mibindex 
where p5m.dt='2020-11-23' and p5m.hh='01' and p5m.mi='00' and p5m.mibindex=4
and p5m.ip='172.31.28.4' 
and p0m.dt='2020-11-23' and p0m.hh='00' and p0m.mi='55'; 

用flink sql client执行,计算结果是 
coltime,a,c,coltime0, b,d 
2020-11-23T01:00
,3702300836,5541513669,2020-11-23T01:00,3702300836,5541513669 
这里 coltime= coltime0,都是2020-11-23T01:00, 同时 a=b,c=d 


hive 行命令查询结果是 
2020-11-23 00:55:00.0,3702187169,5541332531,2020-11-23
01:00:00.0,3702300836,5541513669 
coltime=2020-11-23 00:55:00.0 , coltime0=2020-11-23
01:00:00.0,a!=c, b!=d 

flink 结果明显不正确,flink sql 的self join 需要什么特殊写法吗? 



--
Sent from: http://apache-flink.147419.n8.nabble.com/


用flink 1.11.2 查询hive表自关联(self inner join) 结果不正确

2020-11-22 文章 macdoor
我用相同的一个表自己 inner join 自己,取不同时间点,得到相同值

sql是这样,p5m 和 p0m 都是 snmpprobe.p_snmp_ifxtable 表,时间不同
select
p0m.coltime, p0m.ifhcinoctets a, p0m.ifhcoutoctets c,
p5m.coltime, p5m.ifhcinoctets b, p5m.ifhcoutoctets d
from snmpprobe.p_snmp_ifxtable as p0m
inner join snmpprobe.p_snmp_ifxtable as p5m on p0m.id=p5m.id and
p0m.mibindex=p5m.mibindex
where p5m.dt='2020-11-23' and p5m.hh='01' and p5m.mi='00' and p5m.mibindex=4
and p5m.ip='172.31.28.4'
and p0m.dt='2020-11-23' and p0m.hh='00' and p0m.mi='55';

用flink sql client执行,计算结果是 
coltime,a,c,coltime0, b,d
2020-11-23T01:00
,3702300836,5541513669,2020-11-23T01:00,3702300836,5541513669
这里 coltime= coltime0,都是2020-11-23T01:00, 同时 a=b,c=d


hive 行命令查询结果是
2020-11-23 00:55:00.0,3702187169,5541332531,2020-11-23
01:00:00.0,3702300836,5541513669
coltime=2020-11-23 00:55:00.0 , coltime0=2020-11-23
01:00:00.0,a!=c, b!=d

flink 结果明显不正确,flink sql 的self join 需要什么特殊写法吗?




--
Sent from: http://apache-flink.147419.n8.nabble.com/