抄了下 
https://github.com/apache/flink/blob/release-1.10.0/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestSequentialReadingStreamOperator.java
 , 可以达到串行的效果了:

       DataStream binlogStream = env.addSource(new FlinkKafkaConsumer(…));
       DataStream snapshotStream = 
env.createInput(JDBCInputFormat.buildJDBCInputFormat()….);
       DataStream tableStream = snapshotStream.connect(binlogstream);
       tableStream.transform(“Concat”, new TypeHint<….>(){},   new 
SequentialReadingStreamOperator<>());

但是如果打开 checkpointing,那么 flink 会报错:java.lang.UnsupportedOperationException: 
Checkpointing is currently not supported for operators that implement 
InputSelectable

跪了,跪了。。。


On 2020/4/7, 4:45 PM, "刘宇宝" <liuyu...@yingmi.cn> wrote:

    你这是个新思路,分成两个 job,但是感觉不太值当,或许这里是 Flink目前 
API或者说编程模型很受限的地方,我只是源头数据来自两个地方,要合并下两个数据源,所有下游处理都是一样的。如果按照 actor 的松散模式,我是可以在两个 
SourceActor 之间协调的,一个 SourceActor 发完后,通知另一个 SourceActor 再发,或者启动一个新的 
SourceActor,大家都往同一个下游 actor 发消息。
    
    user@flink 里那个哥们提到 InputSelectable,我还没看明白怎么能用到 DataStream上,似乎它只实现在 
StreamOperator 上:  
https://github.com/apache/flink/search?p=2&q=InputSelectable&unscoped_q=InputSelectable
    
    我目前想到一个笨方法,实现一个 SourceFunction,把 FlinkKafkaConsumerBase 和 JDBCInputFormat 
包到一起,这样可以先把 JDBCInputFormat 数据发完了,再发 FlinkKafkaConsumerBase。 
但是这样做只能单并发,多并发的话需要一个分布式的 barrier,flink 没有内置支持,感觉不是个优美的解决方案。
    
    非常感谢你的解答!
    
    
    
    On 2020/4/7, 4:29 PM, "Jark Wu" <imj...@gmail.com> wrote:
    
        如果你的作业没有 state,那么 全量和增量部分,可以分成两个独立的作业。
        如果你的作业有 state,主要就是设计 state 复用的问题。两个作业的拓扑结构需要确保一致(为了复用 savepoint),一个作业的
        source operator 是 jdbc,另一个 source operator 是 kafka。
        当运行完 jdbc 作业后,对作业触发一次 savepoint,然后用这个 savepoint 恢复 kafka 作业,可以从 earliest
        开始读取(假设作业支持幂等)。
        
        这里的一个问题,主要是如何对 jdbc 作业触发 savepoint,因为 jdbc InputFormat 目前是
        bounded,所以读完后整个作业就结束了,就无法进行 savepoint。
        所以这里可能需要自己修改下源码,让 jdbc source 永远不要结束,但通过日志或者 metric 
或其他方式通知外界数据已经读完(可以开始触发
        savepoint)。
        
        希望这些可以帮助到你。
        
        Best.
        Jark
        
        
        On Tue, 7 Apr 2020 at 16:14, 刘宇宝 <liuyu...@yingmi.cn> wrote:
        
        > 我看了下 streamsql 那个 bootstrap 一段,没看懂怎么处理,我现在已经有 mysql table,算是 
materialized
        > view 了,也有一份
        > Kafka 里的 binlog 数据,问题的麻烦在于怎么「先消费 jdbc table, 消费完后,再切换到 kafka 上」,从 
flink
        > 文档来看,一旦
        > 我把 jdbc table 的流和 kafka 的流加入 env 里,env.execute() 
之后,两个流就同时往下游发数据了——我期望的是
        > jdbc table
        > 的流发完了,才开始发 kafka 的流。
        >
        > 谢谢!
        >
        > On 2020/4/7, 2:16 PM, "Jark Wu" <imj...@gmail.com> wrote:
        >
        >     Hi,
        >
        >     你这里的合并是用join 来做么? 这样的话,会比较耗性能。
        >
        >     一种做法是先消费 jdbc table, 消费完后,再切换到 kafka 上。这种要求 binlog 
是幂等操作的,因为会有多处理一部分的
        >     binlog,没法做到 精确地切换到 kafka offset 上。
        >
        >     另外你也可以参考下 StreamSQL 的 bootstrap 的做法:
        >     https://streamsql.io/blog/from-apache-kafka-to-apache-pulsar
        >
        >     Best,
        >     Jark
        >
        >
        >     On Sun, 5 Apr 2020 at 22:48, 刘宇宝 <liuyu...@yingmi.cn> wrote:
        >
        >     > 大家好,
        >     >
        >     > 我在用 Debezium 从数据库读取最新的 binlog 写入 Kafka,比如对于 
mysql_server.test.tableA
        > 有一个
        >     > topic “mysql_server.test.tableA”,我需要在 Flink 里实现这样的逻辑:
        >     >
        >     >
        >     >   1.  先连接上 Kafka 开始消费 topic “mysql_server.test.tableA”,确保连接成功,记为
        >     > binlog-stream,但是要暂停消费 Kafka;
        >     >   2.  用 JDBCInputFormat 读取 test.tableA 到一个 DataStream 里,记为
        > table-stream;
        >     >   3.  合并两个 streams,消费完 table-stream 后再开始消费 binlog-stream,这样可以确保
        > binlog 是
        >     > *后*  应用到某个快照表上。
        >     >
        >     > 问题是我怎么能暂停消费 binlog-stream 呢? 我目前想到的办法是用 flink state 做一个全局状态
        >     > startBinlog,初始值为 false:
        >     >
        >     >   binlog-stream -> waitOperator   ->   sinkOperator
        >     >   table-stream -> notifyOperator -> sinkOperator
        >     >
        >     > 两个流被合并输出到 sinkOperator,waitOperator() 会 while loop阻塞式的检查全局状态, 等
        >     > table-stream 消费完(不知道怎么判断消费完了。。。),  notifyOperator 修改全局状态,这样
        > binlog-stream
        >     > 就能被继续消费了。
        >     >
        >     > 但由于 kafka consumer 如果长期阻塞不 ack 的话,kafka consumer 
会被断开,所以这个做法应该是不行的。
        >     >
        >     > 请教怎么破?
        >     >
        >     > 谢谢!
        >     >
        >     >
        >
        >
        >
        
    
    

回复