Does it have the same behavior in the direct runner? What are the sizes of intermediate PCollections?
Kenn On Wed, Jan 8, 2020 at 1:05 PM Andrés Garagiola <andresgaragi...@gmail.com> wrote: > Hi all, > > I'm doing some tests with beam and apache flink. I'm running the code > below: > > public static void main(String[] args) throws IOException { > WorkflowStepOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation() > .as(WorkflowStepOptions.class); > logger.info("Options Kafka server {} input topic {} output topic {} > window size {} group id {} step name {}", > options.getKafkaBrokers(), options.getTopics(), > options.getOutputTopic(), options.getWindowSize(), > options.getGroupId(), workflowStepName); > Pipeline p = Pipeline.create(options); > > CoderRegistry cr = p.getCoderRegistry(); > cr.registerCoderForClass(MyClass.class, new MyClassCoder()); > > KafkaIO.Read<Integer, MyClass> kafkaIOReader = > KafkaIO.<Integer,MyClass>read() > .withBootstrapServers(options.getKafkaBrokers()) > .withTopics(Arrays.asList(options.getTopics().split(","))) > .withKeyDeserializer(IntegerDeserializer.class) > .withValueDeserializer(MyClassEventDeserializer.class) > //.withTimestampPolicyFactory(new > MyClassTimestampPolicyFactory()) > .withTimestampFn((KV<Integer,MyClass> event) -> > event.getValue().getDate() == null ? > Instant.now() : > Instant.parse(event.getValue().getDate(), > > DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ssZ"))) > .withConsumerConfigUpdates( > ImmutableMap.of( > "group.id", options.getGroupId(), > "auto.offset.reset", "earliest") > ); > > KafkaIO.Write<String, String> kafkaOutput = KafkaIO.<String, > String>write() > .withBootstrapServers(options.getKafkaBrokers()) > .withTopic(options.getOutputTopic()) > .withKeySerializer(StringSerializer.class) > .withValueSerializer(StringSerializer.class); > > Window<KV<Integer, MyClass>> window = Window > .<KV<Integer, > MyClass>>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))) > .accumulatingFiredPanes() > .withAllowedLateness(Duration.standardDays(365L)) > .triggering(AfterWatermark.pastEndOfWindow() > .withEarlyFirings( > AfterProcessingTime > .pastFirstElementInPane() > > .plusDelayOf(Duration.standardSeconds(1L))) > .withLateFirings( > AfterPane > .elementCountAtLeast(1)) > ); > > PCollection<Long> toFormat = p.apply(kafkaIOReader.withoutMetadata()) > .apply("Window", window) > .apply(Combine.globally(Count.<KV<Integer, > MyClass>>combineFn()).withoutDefaults()); > > toFormat > .apply("FormatResults", > MapElements > > .into(TypeDescriptors.kvs(TypeDescriptors.strings(),TypeDescriptors.strings())) > .via((Long count) -> > { > return KV.of("count", count.toString()); > }) > ) > .apply(kafkaOutput); > > p.run(); > } > > The idea is very simple, read some events from a Kafka topic, group them > into a window, count them and put the result in another Kafka topic. > > I'm a little confuse regarding the result, the code above only produces > one entry counting "1" element while I have a lot (around 500) events in > the source topic. > > Do you have some suggestion to figure out the solution? Something I'm > doing wrong here. > > Regards >