Hi, dinesh , thanks for your reply. For example, there are two topics, topic A produces 1 record per second and topic B produces 3600 records per second. If I set kafka consume config like this: max.poll.records: “3600" max.poll.interval.ms: "1000”) , which means I can get the whole records by every second from these two topics in real time. But , if I want to consume the data from last day or earlier days by using FlinkKafkaConsumer.setStartFromTimestamp(timestamp). I will get 3600 records within one second from *topic A* which is produce *in an hour* in production environment, at the same time, I will get 3600 records within one second from* topic B* which is produce *in an second. *So By using *EventTime* semanteme , the watermark assigned from topic A wil aways let the data from topic B as ‘late data’ in window operator. What I wanted is that 1 records from A and 3600 records from B by using FlinkKafkaConsumer. setStartFromTimestamp(timestamp) so that I can simulate consume data as in real production environment.
Best On Sat, 23 May 2020 at 23:42, C DINESH <dinesh.kitt...@gmail.com> wrote: > Hi Jary, > > What you mean by step banlence . Could you please provide a concrete > example > > On Fri, May 22, 2020 at 3:46 PM Jary Zhen <jaryz...@gmail.com> wrote: > >> Hello everyone, >> >> First,a brief pipeline introduction: >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> consume multi kafka topic >> -> union them >> -> assignTimestampsAndWatermarks >> -> keyby >> -> window() and so on … >> It's a very normal way use flink to process data like this in production >> environment. >> But, If I want to test the pipeline above I need to use the api of >> FlinkKafkaConsumer.setStartFromTimestamp(timestamp) to comsume 'past’ data. >> So my question is how to control the ’step‘ banlence as one topic >> produces 3 records per second while another topic produces 30000 per second. >> >> I don’t know if I describe clearly . so any suspicion please let me know >> >> Tks >> >>