一个朴素的思路,数据量是多少,有没有考虑到数据库的写入压力呢? 去掉kafka sink ,看下 写入效果。 再对比下 加入kafka 后的效果。
一个通道,连接了两个sink,一个落水快,一个落水慢。落水快的很快消化了,落水慢 可能无法控速,就跪了, 进而导致整个通道 跪了 guoliubi...@foxmail.com <guoliubi...@foxmail.com> 于2020年12月18日周五 下午2:01写道: > Hi, > > 我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下 > .process(new ProcessFunction<RatioValue, RatioValue>() { > @Override > public void processElement(RatioValuevalue, Context ctx, > Collector<RatioValue> out) throws Exception { > out.collect(value); > ctx.output(ratioOutputTag, value); > } > }); > sideStream.addSink(new FlinkKafkaProducer<>( > "ratio_value", > new RatioValueSerializationSchema(suffix), > PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL), > tool.get(SCHEMA_REGISTRY_URL)), > FlinkKafkaProducer.Semantic.EXACTLY_ONCE)); > DataStream<RatioValue> ratioSideStream = > sideStream.getSideOutput(ratioOutputTag); > ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool)); > 在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。 > 用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。 > 想问下这种情况是否有什么排查手段? > > > guoliubi...@foxmail.com >