如下代码片段: watermarkStrategy = watermarkStrategy.withTimestampAssigner( new SerializableTimestampAssigner<KafkaMessageWrapper<T>>() { @Override public long extractTimestamp(KafkaMessageWrapper<T> element, long recordTimestamp) { try { return element.getData().getTimestamp(); } catch (Exception e) { return 86400_000; } } } ); 为什么这样会导致序列化报错呢。换成如下就不报错:
watermarkStrategy = watermarkStrategy.withTimestampAssigner( (SerializableTimestampAssigner<KafkaMessageWrapper<T>>) (element, recordTimestamp) -> { try { return element.getData().getTimestamp(); } catch (Exception e) { return 86400_000; } } ); source部分设置watermarkStrategy的时候,导致无法序列化。