试了下一种解决方案,如下,可以调整sql并行度。 val table1: Table = stenv.sqlQuery("select * from test") val schema = table1.getSchema
val table2 = stenv.fromDataStream(table1.toAppendStream[Row].map(item => Row.of(item.getField(0), item.getField(1)))(new RowTypeInfo(schema.getFieldTypes.toList.take(2).toArray, schema.getFieldNames.toList.take(2).toArray)).setParallelism(2)) Peihui He <peihu...@gmail.com> 于2020年10月14日周三 上午11:52写道: > hello, > > stenv.fromDataStream(stream, $"") > > 请教下,如果stream中数据是org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode > 类型,field应该如何设置呢? > 比如: > { > a: 1, > b: { > c: "test" > } > } > > Best Wishes. > > shizk233 <wangwangdaxian...@gmail.com> 于2020年9月28日周一 下午7:15写道: > >> flink sql似乎不能设置rebalance,在Data Stream API可以设。 >> >> 一种思路是拆分sql逻辑,在Data Stream API上创建kafka source后再reblance成一张新表,再做后续dml sql。 >> >> 另一种思路就是kafka topic增加一下分区 >> >> Asahi Lee <978466...@qq.com> 于2020年9月28日周一 下午1:56写道: >> >> > 你好! 使用flink >> > SQL,如何设置rebalance呢?------------------ 原始邮件 ------------------ >> > 发件人: "zilong&nbsp;xiao"<acidzz...@gmail.com> >> > 发送时间: 2020年9月27日(星期天) 晚上6:27 >> > 收件人: "user-zh"<user-zh@flink.apache.org>; >> > 主题: Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出 >> >