Hi all,
I'm doing some tests with beam and apache flink. I'm running the code below:
public static void main(String[] args) throws IOException {
WorkflowStepOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation()
.as(WorkflowStepOptions.class);
logger.info("Options Kafka server {} input topic {} output topic {}
window size {} group id {} step name {}",
options.getKafkaBrokers(), options.getTopics(),
options.getOutputTopic(), options.getWindowSize(),
options.getGroupId(), workflowStepName);
Pipeline p = Pipeline.create(options);
CoderRegistry cr = p.getCoderRegistry();
cr.registerCoderForClass(MyClass.class, new MyClassCoder());
KafkaIO.Read<Integer, MyClass> kafkaIOReader =
KafkaIO.<Integer,MyClass>read()
.withBootstrapServers(options.getKafkaBrokers())
.withTopics(Arrays.asList(options.getTopics().split(",")))
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(MyClassEventDeserializer.class)
//.withTimestampPolicyFactory(new
MyClassTimestampPolicyFactory())
.withTimestampFn((KV<Integer,MyClass> event) ->
event.getValue().getDate() == null ?
Instant.now() :
Instant.parse(event.getValue().getDate(),
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ssZ")))
.withConsumerConfigUpdates(
ImmutableMap.of(
"group.id", options.getGroupId(),
"auto.offset.reset", "earliest")
);
KafkaIO.Write<String, String> kafkaOutput = KafkaIO.<String,
String>write()
.withBootstrapServers(options.getKafkaBrokers())
.withTopic(options.getOutputTopic())
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class);
Window<KV<Integer, MyClass>> window = Window
.<KV<Integer,
MyClass>>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.standardDays(365L))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1L)))
.withLateFirings(
AfterPane
.elementCountAtLeast(1))
);
PCollection<Long> toFormat = p.apply(kafkaIOReader.withoutMetadata())
.apply("Window", window)
.apply(Combine.globally(Count.<KV<Integer,
MyClass>>combineFn()).withoutDefaults());
toFormat
.apply("FormatResults",
MapElements
.into(TypeDescriptors.kvs(TypeDescriptors.strings(),TypeDescriptors.strings()))
.via((Long count) ->
{
return KV.of("count", count.toString());
})
)
.apply(kafkaOutput);
p.run();
}
The idea is very simple, read some events from a Kafka topic, group them
into a window, count them and put the result in another Kafka topic.
I'm a little confuse regarding the result, the code above only produces one
entry counting "1" element while I have a lot (around 500) events in the
source topic.
Do you have some suggestion to figure out the solution? Something I'm doing
wrong here.
Regards