KafkaIO is translated using SDF by default. There were some changes how watermarks are emitted from Impulse (which is what is the SDF "starting point", which might affect what tou see). Please try one of the following:

 a) use --shutdownSourcesAfterIdleMs (e.g. 5000) which should trigger watermark from the Impulse, though it might have some other unwanted consequences, or

 b) try upgrading to 2.53.0, which had some fixes around that

There were some other fixes around handling IO, so please consider upgrade directly to 2.55.0 or at least use --autoWatermarkInterval (e.g. 100) for your Pipeline.

Hope this helps,

 Jan

On 3/27/24 13:51, Sigalit Eliazov wrote:
hi,
this is the pipeline, very simple one
the onTimer is not fired.
We are not using any experimental variables.

public class KafkaBeamPipeline {

    static class ProcessMessageFn extends DoFn<KV<String, String>, String> {
        @StateId("count")
        private final StateSpec<ValueState<Integer>> stateSpec = StateSpecs.value();

        @TimerId("eventTimer")
        private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

        @ProcessElement
        public void processElement(ProcessContext context, @StateId("count") ValueState<Integer> state, @TimerId("eventTimer") Timer timer) {
            Integer count = state.read();
            //some logic
            state.write(count);

            // Set a timer for one minute later
            timer.set(context.timestamp().plus(60000));
            context.output("Current count: " + count);
        }

        @OnTimer("eventTimer")
        public void onTimer(OnTimerContext context, @StateId("count") ValueState<Integer> state) {
            state.write(0);
            System.out.println("Timer fired at " + context.timestamp());
        }
    }

    public static void main(String[] args) {
        Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

        pipeline
            .apply(KafkaIO.<String, String>read()
                    .withBootstrapServers("localhost:9092")
                    .withTopic("topic")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
                    .withoutMetadata())
            .apply(ParDo.of(new ProcessMessageFn()));

        pipeline.run().waitUntilFinish();
    }
}

thanks
Sigalit

On Wed, Mar 27, 2024 at 9:54 AM Jan Lukavský <je...@seznam.cz> wrote:

    Hi,

    what is your runner, is it Flink as well in the issue? What is the
    source of your Pipeline? Do you use some additional flags, e.g.
    --experiments? Do you see that using classical or portable runner?

      Jan

    On 3/26/24 19:18, Sigalit Eliazov wrote:
    > Hi all
    > We encountered issue with timers starting from version 2.52.
    >
    > We saw that the timers are not triggered.
    >
    > https://github.com/apache/beam/issues/29816
    >
    > Did someone encounter such problems as well?
    >
    > Thanks
    > Sigalit
    >
    >
    >

Reply via email to