Hi Zhou Zach: 你可以试试 env.disableOperatorChaining(); 然后观察每个 op 的 watermark 情况。这样能够简单的看下具体的情况。 > 我是怎么设置参数的 我使用的是 Flink SQL Blink Planner,采用的设置方式和你一样 tableEnv.getConfig().getConfiguration() .setString(key, configs.getString(key, null)); 同时我在 source table 中定义了 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
Best forideal 在 2020-08-13 15:20:13,"Zhou Zach" <wander...@163.com> 写道: > > > >Hi forideal, >我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下: > > > val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment > > streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > streamExecutionEnv.setStateBackend(new > RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints")) > > val blinkEnvSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, > blinkEnvSettings) > > streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE) > > streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20)) > > streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900)) > > > streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s") > > >并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下) >在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的 > > > > > > > > > > > > > > >在 2020-08-13 14:02:58,"forideal" <fszw...@163.com> 写道: >>大家好 >> >> 问题的原因定位到了。 >> 由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。 >> 这个时候,我进行了 disable chain,观察 watermark 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 op >> chain 在一起,不能确定到底是那个环节存在问题) >> 发现在 WatermarkAssigner(rowtime=[event_time], >> watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source >> chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark >> 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。 >> 定位到由于 kafka 部分 partition 无数据导致 No watermark 加上 >> table.exec.source.idle-timeout = 10s 参数即可。 >> 当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task 可以生成 >> watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用 disable chain 观察每个 op >> 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。 >> >> >>Best forideal >> >> >> >> >> >> >> >> >>在 2020-08-13 12:56:57,"forideal" <fszw...@163.com> 写道: >>>大家好 >>> >>> >>> 关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 >>> StreamExecWatermarkAssigner >>> 在translateToPlanInternal 中生成了如下一个 class 代码, >>>public final class WatermarkGenerator$2 extends >>>org.apache.flink.table.runtime.generated.WatermarkGenerator { public >>>WatermarkGenerator$2(Object[] references) throws Exception { } @Override >>>public void open(org.apache.flink.configuration.Configuration parameters) >>>throws Exception { } @Override public Long >>>currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws >>>Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean >>>isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp >>>result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { >>>field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 >>>= null; if (!isNull$4) { result$5 = >>>org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() >>> - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) { >>>return null; } else { return result$5.getMillisecond(); } } @Override public >>>void close() throws Exception { } } >>> >>> >>> >>> 其中关键的信息是 result$5 = >>> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() >>> - ((long) 10000L), field$3.getNanoOfMillisecond()); >>>确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 >>>watermark。 >>>在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark >>>这样的结果。因为这部分codegen的代码确实无法进一步debug了。 >>>如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢 >>> >>> Best forideal >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>>在 2020-08-11 17:13:01,"forideal" <fszw...@163.com> 写道: >>>>大家好,请教一个问题 >>>> >>>> >>>> 我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 >>>> watermark。消费大量的数据的时候,就无法生成watermark。 >>>> 一直是 No Watermark。 暂时找不到排查问题的思路。 >>>> Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 >>>> EventTime mode 模式,Blink Planner。 >>>>| >>>>No Watermark | >>>> SQL如下 >>>> >>>> >>>> DDL: >>>> create table test( >>>> user_id varchar, >>>> action varchar, >>>> event_time TIMESTAMP(3), >>>> WATERMARK FOR event_time AS event_time - INTERVAL >>>> '10' SECOND >>>> ) with(); >>>> >>>> >>>> DML: >>>>insert into >>>> console >>>>select >>>> user_id, >>>> f_get_str(bind_id) as id_list >>>>from >>>> ( >>>> select >>>> action as bind_id, >>>> user_id, >>>> event_time >>>> from >>>> ( >>>> SELECT >>>> user_id, >>>> action, >>>> PROCTIME() as proc_time, >>>> event_time >>>> FROM >>>> test >>>> ) T >>>> where >>>> user_id is not null >>>> and user_id <> '' >>>> and CHARACTER_LENGTH(user_id) = 24 >>>> ) T >>>>group by >>>> SESSION(event_time, INTERVAL '10' SECOND), >>>> user_id >>>> >>>>Best forideal