Hi,

I am having a problem getting watermark right. The setup is
- I have a Flink Job which reads from a Kafka topic, uses Protobuf
Deserialization, uses Sliding Window of (120seconds, 30 seconds), sums up
the value and finally returns the result.

The code is pasted below.

The problem here is, I'm not able to reach the sink. I am able to reach the
assignTimestamp when the timestamp arrives, but past that, neither process
function nor the sink function is getting invoked in spite of pumping
events regularly. I'm not able to figure out how to debug this issue.
Plz help.

public class StreamingJob {

    public static void main(String[] args) throws Exception {

        Properties kafkaConsumerProps = new Properties();
        kafkaConsumerProps.setProperty("bootstrap.servers",
"{bootstrap_servers}");
        kafkaConsumerProps.setProperty("group.id", "{group_id}");


        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
Configuration());
        env.enableCheckpointing(100);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setMaxParallelism(5);
        env.setParallelism(5);

        SingleOutputStreamOperator<Eventi.Event> texStream = env
                .addSource(new FlinkKafkaConsumer011<>("auth", new
EventiSchema(), kafkaConsumerProps)).setParallelism(5).setMaxParallelism(5);
        SlidingEventTimeWindows window =
SlidingEventTimeWindows.of(Time.seconds(120), Time.seconds(30));
        texStream.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<Eventi.Event>() {
            @Override
            public long extractAscendingTimestamp(Eventi.Event element) {
                return element.getEventTime().getSeconds() * 1000;
            }
        }).keyBy(Eventi.Event::getEventTime).window(window).process(new
ProcessWindowFunction<Eventi.Event, Object, Timestamp, TimeWindow>() {
            @Override
            public void process(Timestamp timestamp, Context context,
Iterable<Eventi.Event> elements, Collector<Object> out) throws Exception {
                int sum = 0;
                for (Eventi.Event element : elements) {
                    sum++;
                }
                out.collect(sum);
            }
        }).print()

        env.execute();
    }
}

-- 
*"The information contained in this e-mail and any accompanying documents 
may contain information that is confidential or otherwise protected from 
disclosure. If you are not the intended recipient of this message, or if 
this message has been addressed to you in error, please immediately alert 
the sender by replying to this e-mail and then delete this message, 
including any attachments. Any dissemination, distribution or other use of 
the contents of this message by anyone other than the intended recipient is 
strictly prohibited. All messages sent to and from this e-mail address may 
be monitored as permitted by applicable law and regulations to ensure 
compliance with our internal policies and to protect our business."*

Reply via email to