Hi

????????????????????????????,???????????????? wm > window.end_time 
????????????????????????????,?????? wm 
????????????????,????????????????????????????


Best
JasonLee


??2021??10??12?? 11:26??kcz<573693...@qq.com.INVALID> ??????
???????? 
times??????????????????????????????????????????????????????+20??????????????StreamExecutionEnvironment
 env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String&gt; kafkaSource = KafkaSource.<String&gt;builder()
.setBootstrapServers("127.0.0.1:9092")
.setTopics("user_behavior")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();


DataStream<JSONObject&gt; ds = env.fromSource(kafkaSource, WatermarkStrategy
.<String&gt;forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner(new
 MyTimeAssigner("times")), "Kafka Source")
.map(JSONObject::parseObject);ds.print();

Reply via email to