Re: 请教flink cep如何对无序数据处理

2021-05-14 文章 Peihui He
[image: image.png]

这样可以不?

sherlock zw  于2021年5月14日周五 上午8:52写道:

> 兄弟们,想问下Flink的CEP能够对无序数据流进行处理匹配嘛?
> 我这里指的无序事件是:例如有两个事件,事件A和事件B,在一个时间窗口内,只要匹配到了A和B,不论A和B的到来顺序,我都认为是符合我的条件
>


Re: FlinKCEP

2021-05-14 文章 yue ma
eventTime 和 processingTime 都支持的, 可以检查一下是否匹配的逻辑或者 proceccFunction 有问题

lp <973182...@qq.com> 于2021年5月14日周五 下午2:42写道:

> 请教下,flinkCEP只能用在eventTime 模式下吗,因为我发现写了个cep程序,申明采用processingTime,没有数据发出
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.

2021-05-14 文章 automths
Hi:

经过验证,在flink-1.12.0的 HBase connector中,在老版本的TableFactory即HBase2TableFactory下,
即使是在目标表的DDL中声明PK,也还是会出现UpsertStreamTableSink requires that Table has a full 
primary keys if it is updated.异常
而在source表的DDL中声明PK,该问题可以解决。想想有些不合理。




祝好!
automths




On 05/14/2021 15:55,Leonard Xu wrote:
这里说的 PK 是定义在你结果表 DDL 上的PK,最开始的报错信息应该是你结果表上没声明PK吧。
你自定义的 connector 支持 upsert 的话,参考下 HBaseUpsertTableSink 的实现,你的 sink 获取到  Factory 
Context 中schema 的 pk 后,需要按照 upsert 语义处理下。

祝好,
Leonard




On May 14, 2021, at 15:39, automths  wrote:

Hi:

该问题有进一步的进展了。


我把cdc对应的表在创建表时设置了primary key,然后该问题解决了。


现在有一点弄不明白,在查找primary  key的时候,是去查找source表的primary key吗?
源码位置(org.apache.flink.table.planner.plan.utils.UpdatingPlanChecker#getUniqueKeyForUpsertSink)如下:


def getUniqueKeyForUpsertSink(
sinkNode: LegacySink,
planner: PlannerBase,
sink: UpsertStreamTableSink[_]): Option[Array[String]] = {
// extract unique key fields
// Now we pick shortest one to sink
// TODO UpsertStreamTableSink setKeyFields interface should be 
Array[Array[String]]
val sinkFieldNames = sink.getTableSchema.getFieldNames
/** Extracts the unique keys of the table produced by the plan. */
val fmq = FlinkRelMetadataQuery.reuseOrCreate(
planner.getRelBuilder.getCluster.getMetadataQuery)
val uniqueKeys = fmq.getUniqueKeys(sinkNode.getInput) // 此处是查找source的primary 
key吗?
if (uniqueKeys != null && uniqueKeys.size() > 0) {
uniqueKeys
.filter(_.nonEmpty)
.map(_.toArray.map(sinkFieldNames))
.toSeq
.sortBy(_.length)
.headOption
} else {
None
}
}




flink 版本为1.12.0。
在1.13.0上也有出现。


请知道的大佬告知。




祝好!
automths




On 05/14/2021 11:00,automths wrote:
Hi:

我自定义了一个UpsertStreamTableSink,在接入cdc数据向我的这个Sink写数据的时候,抛出了下面的异常:
Exception in thread "main" org.apache.flink.table.api.TableException: 
UpsertStreamTableSink requires that Table has a full primary keys if it is 
updated.
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)


我在构建tableSchema的时候,已经设置了primary key了,但依旧抛出这个错误。
我的flink版本是flink-1.12.0的。


请教一下,这个问题,该怎么解决?




祝好!
automths






Re: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.

2021-05-14 文章 automths
Hi:


结果表上是声明pk的,但是source表没有申请pk,但是还是会抛出下面邮件中说的异常。


我把source表加上pk申明,该异常才没了。
我自定义的sink 就是参考 HBaseUpsertTableSink 来实现的。


我再去用一下HBaseUpsertTableSink 感受一下。










祝好!
automths




On 05/14/2021 15:55,Leonard Xu wrote:
这里说的 PK 是定义在你结果表 DDL 上的PK,最开始的报错信息应该是你结果表上没声明PK吧。
你自定义的 connector 支持 upsert 的话,参考下 HBaseUpsertTableSink 的实现,你的 sink 获取到  Factory 
Context 中schema 的 pk 后,需要按照 upsert 语义处理下。

祝好,
Leonard




On May 14, 2021, at 15:39, automths  wrote:

Hi:

该问题有进一步的进展了。


我把cdc对应的表在创建表时设置了primary key,然后该问题解决了。


现在有一点弄不明白,在查找primary  key的时候,是去查找source表的primary key吗?
源码位置(org.apache.flink.table.planner.plan.utils.UpdatingPlanChecker#getUniqueKeyForUpsertSink)如下:


def getUniqueKeyForUpsertSink(
sinkNode: LegacySink,
planner: PlannerBase,
sink: UpsertStreamTableSink[_]): Option[Array[String]] = {
// extract unique key fields
// Now we pick shortest one to sink
// TODO UpsertStreamTableSink setKeyFields interface should be 
Array[Array[String]]
val sinkFieldNames = sink.getTableSchema.getFieldNames
/** Extracts the unique keys of the table produced by the plan. */
val fmq = FlinkRelMetadataQuery.reuseOrCreate(
planner.getRelBuilder.getCluster.getMetadataQuery)
val uniqueKeys = fmq.getUniqueKeys(sinkNode.getInput) // 此处是查找source的primary 
key吗?
if (uniqueKeys != null && uniqueKeys.size() > 0) {
uniqueKeys
.filter(_.nonEmpty)
.map(_.toArray.map(sinkFieldNames))
.toSeq
.sortBy(_.length)
.headOption
} else {
None
}
}




flink 版本为1.12.0。
在1.13.0上也有出现。


请知道的大佬告知。




祝好!
automths




On 05/14/2021 11:00,automths wrote:
Hi:

我自定义了一个UpsertStreamTableSink,在接入cdc数据向我的这个Sink写数据的时候,抛出了下面的异常:
Exception in thread "main" org.apache.flink.table.api.TableException: 
UpsertStreamTableSink requires that Table has a full primary keys if it is 
updated.
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)


我在构建tableSchema的时候,已经设置了primary key了,但依旧抛出这个错误。
我的flink版本是flink-1.12.0的。


请教一下,这个问题,该怎么解决?




祝好!
automths






Re: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.

2021-05-14 文章 Leonard Xu
这里说的 PK 是定义在你结果表 DDL 上的PK,最开始的报错信息应该是你结果表上没声明PK吧。
你自定义的 connector 支持 upsert 的话,参考下 HBaseUpsertTableSink 的实现,你的 sink 获取到  Factory 
Context 中schema 的 pk 后,需要按照 upsert 语义处理下。

祝好,
Leonard




> On May 14, 2021, at 15:39, automths  wrote:
> 
> Hi:
> 
> 该问题有进一步的进展了。
> 
> 
> 我把cdc对应的表在创建表时设置了primary key,然后该问题解决了。
> 
> 
> 现在有一点弄不明白,在查找primary  key的时候,是去查找source表的primary key吗?
> 源码位置(org.apache.flink.table.planner.plan.utils.UpdatingPlanChecker#getUniqueKeyForUpsertSink)如下:
> 
> 
> def getUniqueKeyForUpsertSink(
>sinkNode: LegacySink,
>planner: PlannerBase,
>sink: UpsertStreamTableSink[_]): Option[Array[String]] = {
>  // extract unique key fields
>  // Now we pick shortest one to sink
>  // TODO UpsertStreamTableSink setKeyFields interface should be 
> Array[Array[String]]
>  val sinkFieldNames = sink.getTableSchema.getFieldNames
>  /** Extracts the unique keys of the table produced by the plan. */
>  val fmq = FlinkRelMetadataQuery.reuseOrCreate(
>planner.getRelBuilder.getCluster.getMetadataQuery)
>  val uniqueKeys = fmq.getUniqueKeys(sinkNode.getInput) // 此处是查找source的primary 
> key吗?
>  if (uniqueKeys != null && uniqueKeys.size() > 0) {
>uniqueKeys
>.filter(_.nonEmpty)
>.map(_.toArray.map(sinkFieldNames))
>.toSeq
>.sortBy(_.length)
>.headOption
>  } else {
>None
>  }
> }
> 
> 
> 
> 
> flink 版本为1.12.0。
> 在1.13.0上也有出现。
> 
> 
> 请知道的大佬告知。
> 
> 
> 
> 
> 祝好!
> automths
> 
> 
> 
> 
> On 05/14/2021 11:00,automths wrote:
> Hi:
> 
> 我自定义了一个UpsertStreamTableSink,在接入cdc数据向我的这个Sink写数据的时候,抛出了下面的异常:
> Exception in thread "main" org.apache.flink.table.api.TableException: 
> UpsertStreamTableSink requires that Table has a full primary keys if it is 
> updated.
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
> at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
> at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> 
> 
> 我在构建tableSchema的时候,已经设置了primary key了,但依旧抛出这个错误。
> 我的flink版本是flink-1.12.0的。
> 
> 
> 请教一下,这个问题,该怎么解决?
> 
> 
> 
> 
> 祝好!
> automths
> 
> 
> 



Re:UpsertStreamTableSink requires that Table has a full primary keys if it is updated.

2021-05-14 文章 automths
Hi:

该问题有进一步的进展了。


我把cdc对应的表在创建表时设置了primary key,然后该问题解决了。


现在有一点弄不明白,在查找primary  key的时候,是去查找source表的primary key吗?
源码位置(org.apache.flink.table.planner.plan.utils.UpdatingPlanChecker#getUniqueKeyForUpsertSink)如下:


def getUniqueKeyForUpsertSink(
sinkNode: LegacySink,
planner: PlannerBase,
sink: UpsertStreamTableSink[_]): Option[Array[String]] = {
  // extract unique key fields
  // Now we pick shortest one to sink
  // TODO UpsertStreamTableSink setKeyFields interface should be 
Array[Array[String]]
  val sinkFieldNames = sink.getTableSchema.getFieldNames
  /** Extracts the unique keys of the table produced by the plan. */
  val fmq = FlinkRelMetadataQuery.reuseOrCreate(
planner.getRelBuilder.getCluster.getMetadataQuery)
  val uniqueKeys = fmq.getUniqueKeys(sinkNode.getInput) // 此处是查找source的primary 
key吗?
  if (uniqueKeys != null && uniqueKeys.size() > 0) {
uniqueKeys
.filter(_.nonEmpty)
.map(_.toArray.map(sinkFieldNames))
.toSeq
.sortBy(_.length)
.headOption
  } else {
None
  }
}




flink 版本为1.12.0。
在1.13.0上也有出现。


请知道的大佬告知。




祝好!
automths




On 05/14/2021 11:00,automths wrote:
Hi:

我自定义了一个UpsertStreamTableSink,在接入cdc数据向我的这个Sink写数据的时候,抛出了下面的异常:
Exception in thread "main" org.apache.flink.table.api.TableException: 
UpsertStreamTableSink requires that Table has a full primary keys if it is 
updated.
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)


我在构建tableSchema的时候,已经设置了primary key了,但依旧抛出这个错误。
我的flink版本是flink-1.12.0的。


请教一下,这个问题,该怎么解决?




祝好!
automths





sideoutput sql语法支持

2021-05-14 文章 Yu Xia
当前通过flink的side
output可以方便的将一条stream划分成两条,但是当想使用SQL的方式实现这个功能。目前社区有做这个的打算吗?还是已经支持了?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

FlinKCEP

2021-05-14 文章 lp
请教下,flinkCEP只能用在eventTime 模式下吗,因为我发现写了个cep程序,申明采用processingTime,没有数据发出



--
Sent from: http://apache-flink.147419.n8.nabble.com/