退订

2021-09-27 文章 Gauler Tan
退订


回复: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate

2021-09-27 文章 Liu Join
Exception in thread "main" org.apache.flink.table.api.TableException: 
GroupWindowAggregate doesn't support consuming update and delete changes which 
is produced by node Deduplicate(keep=[FirstRow], key=[dnt, ordernum, t1, csq, 
num, type], order=[ROWTIME])
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:382)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:165)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:279)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:322)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:204)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(

Re: flink-1.12.5 定义HIVDE DDL ,并且有comment,但是hive表里面没有体现出comment

2021-09-27 文章 Caizhi Weng
Hi!

这个问题已经在社区提出过了,可以在 https://issues.apache.org/jira/browse/FLINK-18958
这里追踪解决进度。

kcz <573693...@qq.com.invalid> 于2021年9月27日周一 上午11:21写道:

> hive版本3.1.0
> ddl如下:
> create table test_hive(
>  id int comment 'test comment'
> ) PARTITIONED BY (dt STRING) STORED AS orc TBLPROPERTIES (
>    'partition.time-extractor.kind'='custom',
>    'partition.time-extractor.timestamp-pattern'='$dt',
>  
>  'partition.time-extractor.class'='com.hycan.bigdata.utils.MyPartTimeExtractor',
>    'sink.partition-commit.trigger'='partition-time',
>    'sink.partition-commit.delay'='1 d',
>    'sink.partition-commit.policy.kind'='metastore,success-file'
> );
>
> 实际使用hive desc formatted test_hive 没有看到comment


回复: flink能支持动态增加任务

2021-09-27 文章 yunying


好的好的,谢谢
| |
cuibeiyu2672
|
|
cuibeiyu2...@163.com
|
签名由网易邮箱大师定制
在2021年9月28日 10:16,Caizhi Weng 写道:
Hi!

如果你说的是对于固定的字段,每次需要过滤出来的值不一样,可以考虑维表 join。维表里保存的就是你需要过滤出来的值,这样每次只要更新维表即可。

如果你说的是每次要选择不同的字段,可能只能通过 udtf 来完成这个需求。udtf 里通过网络等方式访问外部资源来判断现在需要过滤的是哪些字段的哪些值。

yunying  于2021年9月28日周二 上午9:47写道:


flink消费一个kafka主题,比如里面有一个字段分为a,b,c,d..,现在有一个需求就是要过滤出字段是a的数据,进行后续操作。写完这个任务提交了过后,过段时间又需要过滤出字段b进行后续操作,后续的操作都是一样的,现在我就要为它在开发一个任务,在提交一次,数据量都不大。但是每提交一次都会耗费资源。以后说不定还会要过滤c,d,e有什么好办法解决这个问题吗


Unsubscribe

2021-09-27 文章 475916258
Unsubscribe

Re: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate

2021-09-27 文章 Caizhi Weng
Hi!

你使用的是什么 Flink 版本呢?之前的 Flink 版本 window agg 只能消费 insert only 的数据,最新的 Flink
1.14 能够支持这样的询问。

lzy139...@outlook.com  于2021年9月27日周一 上午11:37写道:

> 使用ROW_NUMBER过滤数据后,进行开窗聚合计算报错


Re: flink能支持动态增加任务

2021-09-27 文章 Caizhi Weng
Hi!

如果你说的是对于固定的字段,每次需要过滤出来的值不一样,可以考虑维表 join。维表里保存的就是你需要过滤出来的值,这样每次只要更新维表即可。

如果你说的是每次要选择不同的字段,可能只能通过 udtf 来完成这个需求。udtf 里通过网络等方式访问外部资源来判断现在需要过滤的是哪些字段的哪些值。

yunying  于2021年9月28日周二 上午9:47写道:

>
> flink消费一个kafka主题,比如里面有一个字段分为a,b,c,d..,现在有一个需求就是要过滤出字段是a的数据,进行后续操作。写完这个任务提交了过后,过段时间又需要过滤出字段b进行后续操作,后续的操作都是一样的,现在我就要为它在开发一个任务,在提交一次,数据量都不大。但是每提交一次都会耗费资源。以后说不定还会要过滤c,d,e有什么好办法解决这个问题吗


Re: 退订

2021-09-27 文章 Leonard Xu
如果需要取消订阅 user-zh@flink.apache.org  
邮件组,请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 
  即可


> 在 2021年9月27日,14:43,rzy1107  写道:
> 
> 退订



Re: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate

2021-09-27 文章 Leonard Xu
hi, 报错详情可以在邮件贴下吗?


> 在 2021年9月27日,11:36,lzy139...@outlook.com 写道:
> 
> 使用ROW_NUMBER过滤数据后,进行开窗聚合计算报错



flink能支持动态增加任务

2021-09-27 文章 yunying
flink消费一个kafka主题,比如里面有一个字段分为a,b,c,d..,现在有一个需求就是要过滤出字段是a的数据,进行后续操作。写完这个任务提交了过后,过段时间又需要过滤出字段b进行后续操作,后续的操作都是一样的,现在我就要为它在开发一个任务,在提交一次,数据量都不大。但是每提交一次都会耗费资源。以后说不定还会要过滤c,d,e有什么好办法解决这个问题吗

Re: flink 1.13.2 使用avg函数对int字段求平均值,输出类型为int类型,而不是浮点型

2021-09-27 文章 Shuo Cheng
by-design 的行为, avg 就是 sum / count, flink 目前行为是根据入参类型来推断返回类型 (与 `sum` 以及 `/`
保持一致), 想要保持高精度,可以考虑把入参 cast 成 double.

On Mon, Sep 27, 2021 at 2:30 PM Asahi Lee <978466...@qq.com.invalid> wrote:

> hi!      我使用flink 1.13.2版本,在对 int 类型的字段通过avg函数求平均值时,其返回值类型为
> int ,而不是 double,decimal等浮点类型,导致计算值的精度丢失,请问这是bug吗?