Hi, > 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); > 还要发送一次SchemaChangeEvent呢?
Sink 也会收到 SchemaChangeEvent,因为 Sink 可能需要根据 Schema 变更的情况来调整 serializer 或 writer,参考 DorisEventSerializer > 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release > upstream的呢? 被 block 的原因是 responseFuture没有 complete,在SchemaOperator.sendRequestToCoordinator 使用 responseFuture.get() 在没有完成时会 block 住。 只有当收到 FlushSuccessEvent 时,才会执行 schema 变更,当 schema 变更完毕后,将 waitFlushSuccess的responseFuture 标记为 complete。 参考 SchemaRegistryRequestHandler.handleSchemaChangeRequest:100~105,SchemaRegistryRequestHandler.flushSuccess:148~150. 保证顺序的问题比较复杂,可以参考一下源码和设计文档 [1]。 Best, Jiabao [1] https://docs.google.com/document/d/1tJ0JSnpe_a4BgLmTGQyG-hs4O7Ui8aUtdT4PVIkBWPY/edit > 2023年12月27日 22:14,casel.chen <casel_c...@126.com> 写道: > > 看了infoq介绍flink cdc 3.0文章 > https://xie.infoq.cn/article/a80608df71c5291186153600b,我对其中schema. > evolution设计原理想不明白,框架是如何做到schema change顺序性的。文章介绍得并不详细。 > 从mysql binlog产生changeEvent来看,所有的变更都是时间线性的,例如s1, d1, d2, s2, d3, d4, d5, s3, > d6 其中d代表数据变更,s代表schema变更 > 这意味着d1,d2使用的是s1 schema,而d3~d5用的是s2 schema,最后d6使用的是s3 schema。 > 如果flink开多个并发进行处理的话,这些变更序列会被分发到不同task上进行处理,例如2个并行度下,Task1处理 s1, d1, d2, s2, > 而Task2处理 d3, d4, d5, s3, d6 > 这时候数据schema版本顺序性如何保障?会不会用错误的schema版本处理了数据变更呢? > > > SchemaOperator代码中 > private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent > schemaChangeEvent) { > // The request will need to send a FlushEvent or block until flushing > finished > SchemaChangeResponse response = requestSchemaChange(tableId, > schemaChangeEvent); > if (response.isShouldSendFlushEvent()) { > LOG.info( > "Sending the FlushEvent for table {} in subtask {}.", > tableId, > getRuntimeContext().getIndexOfThisSubtask()); > output.collect(new StreamRecord<>(new FlushEvent(tableId))); > output.collect(new StreamRecord<>(schemaChangeEvent)); > // The request will block until flushing finished in each sink > writer > requestReleaseUpstream(); > } > } > 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); > 还要发送一次SchemaChangeEvent呢? > 当收到FlushSuccessEvent后SchemaRegistryRequestHandler不是已经调用MetadataApplier执行schemaChange动作了么? > 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release > upstream的呢? > 求指教,谢谢! >