Does it have the same behavior in the direct runner? What are the sizes of
intermediate PCollections?


On Wed, Jan 8, 2020 at 1:05 PM Andrés Garagiola <>

> Hi all,
> I'm doing some tests with beam and apache flink. I'm running the code
> below:
>   public static void main(String[] args) throws IOException {
>     WorkflowStepOptions options =
> PipelineOptionsFactory.fromArgs(args).withValidation()
>             .as(WorkflowStepOptions.class);
>"Options Kafka server {} input topic {} output topic {}
> window size {} group id {} step name {}",
>             options.getKafkaBrokers(), options.getTopics(),
> options.getOutputTopic(), options.getWindowSize(),
>             options.getGroupId(), workflowStepName);
>     Pipeline p = Pipeline.create(options);
>     CoderRegistry cr = p.getCoderRegistry();
>     cr.registerCoderForClass(MyClass.class, new MyClassCoder());
>     KafkaIO.Read<Integer, MyClass> kafkaIOReader =
> KafkaIO.<Integer,MyClass>read()
>             .withBootstrapServers(options.getKafkaBrokers())
>             .withTopics(Arrays.asList(options.getTopics().split(",")))
>             .withKeyDeserializer(IntegerDeserializer.class)
>             .withValueDeserializer(MyClassEventDeserializer.class)
>             //.withTimestampPolicyFactory(new
> MyClassTimestampPolicyFactory())
>             .withTimestampFn((KV<Integer,MyClass> event) ->
>                     event.getValue().getDate() == null ?
>                    :
>                             Instant.parse(event.getValue().getDate(),
> DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ssZ")))
>             .withConsumerConfigUpdates(
>                     ImmutableMap.of(
>                             "", options.getGroupId(),
>                             "auto.offset.reset", "earliest")
>             );
>     KafkaIO.Write<String, String> kafkaOutput = KafkaIO.<String,
> String>write()
>             .withBootstrapServers(options.getKafkaBrokers())
>             .withTopic(options.getOutputTopic())
>             .withKeySerializer(StringSerializer.class)
>             .withValueSerializer(StringSerializer.class);
>     Window<KV<Integer, MyClass>> window = Window
>             .<KV<Integer,
> MyClass>>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))
>             .accumulatingFiredPanes()
>             .withAllowedLateness(Duration.standardDays(365L))
>             .triggering(AfterWatermark.pastEndOfWindow()
>                     .withEarlyFirings(
>                             AfterProcessingTime
>                                     .pastFirstElementInPane()
> .plusDelayOf(Duration.standardSeconds(1L)))
>                     .withLateFirings(
>                             AfterPane
>                                     .elementCountAtLeast(1))
>             );
>     PCollection<Long> toFormat = p.apply(kafkaIOReader.withoutMetadata())
>             .apply("Window", window)
>             .apply(Combine.globally(Count.<KV<Integer,
> MyClass>>combineFn()).withoutDefaults());
>     toFormat
>             .apply("FormatResults",
>                     MapElements
> .into(TypeDescriptors.kvs(TypeDescriptors.strings(),TypeDescriptors.strings()))
>                             .via((Long count) ->
>                             {
>                               return KV.of("count", count.toString());
>                             })
>             )
>             .apply(kafkaOutput);
>   }
> The idea is very simple, read some events from a Kafka topic, group them
> into a window, count them and put the result in another Kafka topic.
> I'm a little confuse regarding the result, the code above only produces
> one entry counting "1" element while I have a lot (around 500) events in
> the source topic.
> Do you have some suggestion to figure out the solution? Something I'm
> doing wrong here.
> Regards

Reply via email to