Re: flink sql row类型group by

2020-06-28 文章 Leonard Xu

> 在 2020年6月29日,12:05,sunfulin  写道:
> 
> 这种情况下推导PK貌似会报错(UpsertStreamTableSink requires full primary keys)。这种应该怎么写哈?

Hi,
在1.10.x 版本中,upsertSink 中推导 pk 是通过query 来推导,这个比较好的解决是等1.11发布后,通过在建表的DDL声明主键( 
PRIMARY KEY NOT ENFORCED), 如果要在1.10.x里解决,一般是改写下query,使得推导的pk能符合预期。这个写入es的sink要求 
pk 是简单类型,而你的query又需要ROW(c, d) 复合类型, 不太好改写。想到hack一点的方式就是把c,d 
拼接成一个字段c${delimeter}d,ROW(c, d)  用UDF构造,感觉这种也比较绕。如果业务上不是强需求ROW(c, 
d),又等不及1.11的话,可以在ES里多加一列就好了。

祝好,
Leonard Xu

Re:Re: flink sql row类型group by

2020-06-28 文章 sunfulin






hi, Leonard
这个写法应该是OK,不过我的场景下是下面这种
select a, b, row(commentId, commentContent) from T
group by a, b, commentId, commentContent
这种情况下推导PK貌似会报错(UpsertStreamTableSink requires full primary keys)。这种应该怎么写哈?











在 2020-06-29 10:19:31,"Leonard Xu"  写道:
>Hi,
>异常信息很有用,group by ROW 在 Flink SQL 里是支持的,只是在 ElasticSearchUpsertSink 的时候不支持,原因是 
>ElasticSearchUpsertSink 需要将 group by 的 keys 字段转换成 String 用于构造 
>UpdateRequest、DeleteRequest 对象,更进一步的原因是 keys 字段转换成的 String 对应了 Es 中的 id 
>字段(String 类型)。So, 当前ElasticSearchUpsertSink 的实现中,只支持将简单类型作为 keys 
>,复杂类型不支持,复杂类型toString()的结果可能不是我们想要的。
>
>你可以试下下面的query,query keys  对应es中的 id  就是 
>commentId${keyDelimiter}commentContent, 这也应该是你需要的结果 
>Select ROW(commentId, commentContent) from T
>group by commentId, commentContent
>
>祝好,
>Leonard Xu
>
>> 在 2020年6月28日,22:33,sunfulin  写道:
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> hi, 
>> 谢谢本超的热心回复。忘了贴异常了。使用flink 1.10.1 blink planner
>> 
>> 
>> org.apache.flink.table.api.ValidationException: Only simple types that can 
>> be safely converted into a string representation can be used as keys. But 
>> was: Row(commentId: String, commentContent: String)
>> at 
>> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.validateKeyTypes(ElasticsearchUpsertTableSinkBase.java:310)
>> at 
>> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.setKeyFields(ElasticsearchUpsertTableSinkBase.java:152)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:111)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>> at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>> at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>> at 
>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>> at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>> at scala.collection.Iterator.foreach(Iterator.scala:937)
>> at scala.collection.Iterator.foreach$(Iterator.scala:937)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> at 
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>> at 
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>> at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
>> at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>> at 
>> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:93)
>> at 
>> com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.doJob(ArotaLiveZLCFTRealtimeJob.java:42)
>> at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:65)
>> at 
>> com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.main(ArotaLiveZLCFTRealtimeJob.java:47)
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-06-28 10:15:34,"Benchao Li"  写道:
>>> Hi,
>>> 我好像没有看到哪里说不可以group by ROW类型呀,可以说下你使用的是哪个Flink版本,以及哪个planner么?
>>> 能附上异常栈就更好啦。
>>> 
>>> sunfulin  于2020年6月25日周四 下午4:35写道:
>>> 
 Hi,
 请教大家一个问题,我在一个sink table里定义了一个column A,类型是Row。
 在通过Flink SQL 做sink的时候,尝试group by A,报错说ROW类型无法group by。这种应该如何处理哈?
>>> 
>>> 
>>> 
>>> -- 
>>> 
>>> Best,
>>> Benchao Li


Re: flink sql row类型group by

2020-06-28 文章 Leonard Xu
Hi,
异常信息很有用,group by ROW 在 Flink SQL 里是支持的,只是在 ElasticSearchUpsertSink 的时候不支持,原因是 
ElasticSearchUpsertSink 需要将 group by 的 keys 字段转换成 String 用于构造 
UpdateRequest、DeleteRequest 对象,更进一步的原因是 keys 字段转换成的 String 对应了 Es 中的 id 
字段(String 类型)。So, 当前ElasticSearchUpsertSink 的实现中,只支持将简单类型作为 keys 
,复杂类型不支持,复杂类型toString()的结果可能不是我们想要的。

你可以试下下面的query,query keys  对应es中的 id  就是 commentId${keyDelimiter}commentContent, 
这也应该是你需要的结果 
Select ROW(commentId, commentContent) from T
group by commentId, commentContent

祝好,
Leonard Xu

> 在 2020年6月28日,22:33,sunfulin  写道:
> 
> 
> 
> 
> 
> 
> 
> hi, 
> 谢谢本超的热心回复。忘了贴异常了。使用flink 1.10.1 blink planner
> 
> 
> org.apache.flink.table.api.ValidationException: Only simple types that can be 
> safely converted into a string representation can be used as keys. But was: 
> Row(commentId: String, commentContent: String)
> at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.validateKeyTypes(ElasticsearchUpsertTableSinkBase.java:310)
> at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.setKeyFields(ElasticsearchUpsertTableSinkBase.java:152)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:111)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> at 
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
> at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
> at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> at 
> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:93)
> at 
> com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.doJob(ArotaLiveZLCFTRealtimeJob.java:42)
> at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:65)
> at 
> com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.main(ArotaLiveZLCFTRealtimeJob.java:47)
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-06-28 10:15:34,"Benchao Li"  写道:
>> Hi,
>> 我好像没有看到哪里说不可以group by ROW类型呀,可以说下你使用的是哪个Flink版本,以及哪个planner么?
>> 能附上异常栈就更好啦。
>> 
>> sunfulin  于2020年6月25日周四 下午4:35写道:
>> 
>>> Hi,
>>> 请教大家一个问题,我在一个sink table里定义了一个column A,类型是Row。
>>> 在通过Flink SQL 做sink的时候,尝试group by A,报错说ROW类型无法group by。这种应该如何处理哈?
>> 
>> 
>> 
>> -- 
>> 
>> Best,
>> Benchao Li



Re:Re: flink sql row类型group by

2020-06-28 文章 sunfulin






hi, 
谢谢本超的热心回复。忘了贴异常了。使用flink 1.10.1 blink planner


org.apache.flink.table.api.ValidationException: Only simple types that can be 
safely converted into a string representation can be used as keys. But was: 
Row(commentId: String, commentContent: String)
at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.validateKeyTypes(ElasticsearchUpsertTableSinkBase.java:310)
at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.setKeyFields(ElasticsearchUpsertTableSinkBase.java:152)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:111)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
at com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:93)
at 
com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.doJob(ArotaLiveZLCFTRealtimeJob.java:42)
at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:65)
at 
com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.main(ArotaLiveZLCFTRealtimeJob.java:47)















在 2020-06-28 10:15:34,"Benchao Li"  写道:
>Hi,
>我好像没有看到哪里说不可以group by ROW类型呀,可以说下你使用的是哪个Flink版本,以及哪个planner么?
>能附上异常栈就更好啦。
>
>sunfulin  于2020年6月25日周四 下午4:35写道:
>
>> Hi,
>> 请教大家一个问题,我在一个sink table里定义了一个column A,类型是Row。
>> 在通过Flink SQL 做sink的时候,尝试group by A,报错说ROW类型无法group by。这种应该如何处理哈?
>
>
>
>-- 
>
>Best,
>Benchao Li


Re: flink sql row类型group by

2020-06-27 文章 Benchao Li
Hi,
我好像没有看到哪里说不可以group by ROW类型呀,可以说下你使用的是哪个Flink版本,以及哪个planner么?
能附上异常栈就更好啦。

sunfulin  于2020年6月25日周四 下午4:35写道:

> Hi,
> 请教大家一个问题,我在一个sink table里定义了一个column A,类型是Row。
> 在通过Flink SQL 做sink的时候,尝试group by A,报错说ROW类型无法group by。这种应该如何处理哈?



-- 

Best,
Benchao Li


flink sql row类型group by

2020-06-25 文章 sunfulin
Hi,
请教大家一个问题,我在一个sink table里定义了一个column A,类型是Row。
在通过Flink SQL 做sink的时候,尝试group by A,报错说ROW类型无法group by。这种应该如何处理哈?