Re: 请教flink cep如何对无序数据处理
[image: image.png] 这样可以不? sherlock zw 于2021年5月14日周五 上午8:52写道: > 兄弟们,想问下Flink的CEP能够对无序数据流进行处理匹配嘛? > 我这里指的无序事件是:例如有两个事件,事件A和事件B,在一个时间窗口内,只要匹配到了A和B,不论A和B的到来顺序,我都认为是符合我的条件 >
Re: FlinKCEP
eventTime 和 processingTime 都支持的, 可以检查一下是否匹配的逻辑或者 proceccFunction 有问题 lp <973182...@qq.com> 于2021年5月14日周五 下午2:42写道: > 请教下,flinkCEP只能用在eventTime 模式下吗,因为我发现写了个cep程序,申明采用processingTime,没有数据发出 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
Re: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.
Hi: 经过验证,在flink-1.12.0的 HBase connector中,在老版本的TableFactory即HBase2TableFactory下, 即使是在目标表的DDL中声明PK,也还是会出现UpsertStreamTableSink requires that Table has a full primary keys if it is updated.异常 而在source表的DDL中声明PK,该问题可以解决。想想有些不合理。 祝好! automths On 05/14/2021 15:55,Leonard Xu wrote: 这里说的 PK 是定义在你结果表 DDL 上的PK,最开始的报错信息应该是你结果表上没声明PK吧。 你自定义的 connector 支持 upsert 的话,参考下 HBaseUpsertTableSink 的实现,你的 sink 获取到 Factory Context 中schema 的 pk 后,需要按照 upsert 语义处理下。 祝好, Leonard On May 14, 2021, at 15:39, automths wrote: Hi: 该问题有进一步的进展了。 我把cdc对应的表在创建表时设置了primary key,然后该问题解决了。 现在有一点弄不明白,在查找primary key的时候,是去查找source表的primary key吗? 源码位置(org.apache.flink.table.planner.plan.utils.UpdatingPlanChecker#getUniqueKeyForUpsertSink)如下: def getUniqueKeyForUpsertSink( sinkNode: LegacySink, planner: PlannerBase, sink: UpsertStreamTableSink[_]): Option[Array[String]] = { // extract unique key fields // Now we pick shortest one to sink // TODO UpsertStreamTableSink setKeyFields interface should be Array[Array[String]] val sinkFieldNames = sink.getTableSchema.getFieldNames /** Extracts the unique keys of the table produced by the plan. */ val fmq = FlinkRelMetadataQuery.reuseOrCreate( planner.getRelBuilder.getCluster.getMetadataQuery) val uniqueKeys = fmq.getUniqueKeys(sinkNode.getInput) // 此处是查找source的primary key吗? if (uniqueKeys != null && uniqueKeys.size() > 0) { uniqueKeys .filter(_.nonEmpty) .map(_.toArray.map(sinkFieldNames)) .toSeq .sortBy(_.length) .headOption } else { None } } flink 版本为1.12.0。 在1.13.0上也有出现。 请知道的大佬告知。 祝好! automths On 05/14/2021 11:00,automths wrote: Hi: 我自定义了一个UpsertStreamTableSink,在接入cdc数据向我的这个Sink写数据的时候,抛出了下面的异常: Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) 我在构建tableSchema的时候,已经设置了primary key了,但依旧抛出这个错误。 我的flink版本是flink-1.12.0的。 请教一下,这个问题,该怎么解决? 祝好! automths
Re: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.
Hi: 结果表上是声明pk的,但是source表没有申请pk,但是还是会抛出下面邮件中说的异常。 我把source表加上pk申明,该异常才没了。 我自定义的sink 就是参考 HBaseUpsertTableSink 来实现的。 我再去用一下HBaseUpsertTableSink 感受一下。 祝好! automths On 05/14/2021 15:55,Leonard Xu wrote: 这里说的 PK 是定义在你结果表 DDL 上的PK,最开始的报错信息应该是你结果表上没声明PK吧。 你自定义的 connector 支持 upsert 的话,参考下 HBaseUpsertTableSink 的实现,你的 sink 获取到 Factory Context 中schema 的 pk 后,需要按照 upsert 语义处理下。 祝好, Leonard On May 14, 2021, at 15:39, automths wrote: Hi: 该问题有进一步的进展了。 我把cdc对应的表在创建表时设置了primary key,然后该问题解决了。 现在有一点弄不明白,在查找primary key的时候,是去查找source表的primary key吗? 源码位置(org.apache.flink.table.planner.plan.utils.UpdatingPlanChecker#getUniqueKeyForUpsertSink)如下: def getUniqueKeyForUpsertSink( sinkNode: LegacySink, planner: PlannerBase, sink: UpsertStreamTableSink[_]): Option[Array[String]] = { // extract unique key fields // Now we pick shortest one to sink // TODO UpsertStreamTableSink setKeyFields interface should be Array[Array[String]] val sinkFieldNames = sink.getTableSchema.getFieldNames /** Extracts the unique keys of the table produced by the plan. */ val fmq = FlinkRelMetadataQuery.reuseOrCreate( planner.getRelBuilder.getCluster.getMetadataQuery) val uniqueKeys = fmq.getUniqueKeys(sinkNode.getInput) // 此处是查找source的primary key吗? if (uniqueKeys != null && uniqueKeys.size() > 0) { uniqueKeys .filter(_.nonEmpty) .map(_.toArray.map(sinkFieldNames)) .toSeq .sortBy(_.length) .headOption } else { None } } flink 版本为1.12.0。 在1.13.0上也有出现。 请知道的大佬告知。 祝好! automths On 05/14/2021 11:00,automths wrote: Hi: 我自定义了一个UpsertStreamTableSink,在接入cdc数据向我的这个Sink写数据的时候,抛出了下面的异常: Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) 我在构建tableSchema的时候,已经设置了primary key了,但依旧抛出这个错误。 我的flink版本是flink-1.12.0的。 请教一下,这个问题,该怎么解决? 祝好! automths
Re: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.
这里说的 PK 是定义在你结果表 DDL 上的PK,最开始的报错信息应该是你结果表上没声明PK吧。 你自定义的 connector 支持 upsert 的话,参考下 HBaseUpsertTableSink 的实现,你的 sink 获取到 Factory Context 中schema 的 pk 后,需要按照 upsert 语义处理下。 祝好, Leonard > On May 14, 2021, at 15:39, automths wrote: > > Hi: > > 该问题有进一步的进展了。 > > > 我把cdc对应的表在创建表时设置了primary key,然后该问题解决了。 > > > 现在有一点弄不明白,在查找primary key的时候,是去查找source表的primary key吗? > 源码位置(org.apache.flink.table.planner.plan.utils.UpdatingPlanChecker#getUniqueKeyForUpsertSink)如下: > > > def getUniqueKeyForUpsertSink( >sinkNode: LegacySink, >planner: PlannerBase, >sink: UpsertStreamTableSink[_]): Option[Array[String]] = { > // extract unique key fields > // Now we pick shortest one to sink > // TODO UpsertStreamTableSink setKeyFields interface should be > Array[Array[String]] > val sinkFieldNames = sink.getTableSchema.getFieldNames > /** Extracts the unique keys of the table produced by the plan. */ > val fmq = FlinkRelMetadataQuery.reuseOrCreate( >planner.getRelBuilder.getCluster.getMetadataQuery) > val uniqueKeys = fmq.getUniqueKeys(sinkNode.getInput) // 此处是查找source的primary > key吗? > if (uniqueKeys != null && uniqueKeys.size() > 0) { >uniqueKeys >.filter(_.nonEmpty) >.map(_.toArray.map(sinkFieldNames)) >.toSeq >.sortBy(_.length) >.headOption > } else { >None > } > } > > > > > flink 版本为1.12.0。 > 在1.13.0上也有出现。 > > > 请知道的大佬告知。 > > > > > 祝好! > automths > > > > > On 05/14/2021 11:00,automths wrote: > Hi: > > 我自定义了一个UpsertStreamTableSink,在接入cdc数据向我的这个Sink写数据的时候,抛出了下面的异常: > Exception in thread "main" org.apache.flink.table.api.TableException: > UpsertStreamTableSink requires that Table has a full primary keys if it is > updated. > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > > 我在构建tableSchema的时候,已经设置了primary key了,但依旧抛出这个错误。 > 我的flink版本是flink-1.12.0的。 > > > 请教一下,这个问题,该怎么解决? > > > > > 祝好! > automths > > >
Re:UpsertStreamTableSink requires that Table has a full primary keys if it is updated.
Hi: 该问题有进一步的进展了。 我把cdc对应的表在创建表时设置了primary key,然后该问题解决了。 现在有一点弄不明白,在查找primary key的时候,是去查找source表的primary key吗? 源码位置(org.apache.flink.table.planner.plan.utils.UpdatingPlanChecker#getUniqueKeyForUpsertSink)如下: def getUniqueKeyForUpsertSink( sinkNode: LegacySink, planner: PlannerBase, sink: UpsertStreamTableSink[_]): Option[Array[String]] = { // extract unique key fields // Now we pick shortest one to sink // TODO UpsertStreamTableSink setKeyFields interface should be Array[Array[String]] val sinkFieldNames = sink.getTableSchema.getFieldNames /** Extracts the unique keys of the table produced by the plan. */ val fmq = FlinkRelMetadataQuery.reuseOrCreate( planner.getRelBuilder.getCluster.getMetadataQuery) val uniqueKeys = fmq.getUniqueKeys(sinkNode.getInput) // 此处是查找source的primary key吗? if (uniqueKeys != null && uniqueKeys.size() > 0) { uniqueKeys .filter(_.nonEmpty) .map(_.toArray.map(sinkFieldNames)) .toSeq .sortBy(_.length) .headOption } else { None } } flink 版本为1.12.0。 在1.13.0上也有出现。 请知道的大佬告知。 祝好! automths On 05/14/2021 11:00,automths wrote: Hi: 我自定义了一个UpsertStreamTableSink,在接入cdc数据向我的这个Sink写数据的时候,抛出了下面的异常: Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) 我在构建tableSchema的时候,已经设置了primary key了,但依旧抛出这个错误。 我的flink版本是flink-1.12.0的。 请教一下,这个问题,该怎么解决? 祝好! automths
sideoutput sql语法支持
当前通过flink的side output可以方便的将一条stream划分成两条,但是当想使用SQL的方式实现这个功能。目前社区有做这个的打算吗?还是已经支持了? -- Sent from: http://apache-flink.147419.n8.nabble.com/
FlinKCEP
请教下,flinkCEP只能用在eventTime 模式下吗,因为我发现写了个cep程序,申明采用processingTime,没有数据发出 -- Sent from: http://apache-flink.147419.n8.nabble.com/