KafkaIO is translated using SDF by default. There were some changes how
watermarks are emitted from Impulse (which is what is the SDF "starting
point", which might affect what tou see). Please try one of the following:
a) use --shutdownSourcesAfterIdleMs (e.g. 5000) which should trigger
watermark from the Impulse, though it might have some other unwanted
consequences, or
b) try upgrading to 2.53.0, which had some fixes around that
There were some other fixes around handling IO, so please consider
upgrade directly to 2.55.0 or at least use --autoWatermarkInterval (e.g.
100) for your Pipeline.
Hope this helps,
Jan
On 3/27/24 13:51, Sigalit Eliazov wrote:
hi,
this is the pipeline, very simple one
the onTimer is not fired.
We are not using any experimental variables.
public class KafkaBeamPipeline {
static class ProcessMessageFn extends DoFn<KV<String, String>,
String> {
@StateId("count")
private final StateSpec<ValueState<Integer>> stateSpec =
StateSpecs.value();
@TimerId("eventTimer")
private final TimerSpec timerSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
public void processElement(ProcessContext context,
@StateId("count") ValueState<Integer> state, @TimerId("eventTimer")
Timer timer) {
Integer count = state.read();
//some logic
state.write(count);
// Set a timer for one minute later
timer.set(context.timestamp().plus(60000));
context.output("Current count: " + count);
}
@OnTimer("eventTimer")
public void onTimer(OnTimerContext context, @StateId("count")
ValueState<Integer> state) {
state.write(0);
System.out.println("Timer fired at " + context.timestamp());
}
}
public static void main(String[] args) {
Pipeline pipeline =
Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
pipeline
.apply(KafkaIO.<String, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("topic")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata())
.apply(ParDo.of(new ProcessMessageFn()));
pipeline.run().waitUntilFinish();
}
}
thanks
Sigalit
On Wed, Mar 27, 2024 at 9:54 AM Jan Lukavský <je...@seznam.cz> wrote:
Hi,
what is your runner, is it Flink as well in the issue? What is the
source of your Pipeline? Do you use some additional flags, e.g.
--experiments? Do you see that using classical or portable runner?
Jan
On 3/26/24 19:18, Sigalit Eliazov wrote:
> Hi all
> We encountered issue with timers starting from version 2.52.
>
> We saw that the timers are not triggered.
>
> https://github.com/apache/beam/issues/29816
>
> Did someone encounter such problems as well?
>
> Thanks
> Sigalit
>
>
>