stankiewicz commented on PR #38230:
URL: https://github.com/apache/beam/pull/38230#issuecomment-4379001452

   new state can and will increase data processed by few percent.
   this pipeline had 2% data processed increased.
   
   ```
   public static void main(String[] args) {
       PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
       Pipeline p = Pipeline.create(options);
   
       p
           // Read taxi ride data
           .apply("ReadFromPubSub", 
PubsubIO.readStrings().fromTopic(TAXI_RIDES_TOPIC))
           // Convert JSON strings to Beam Rows
           .apply("JsonToRow", JsonToRow.withSchema(TAXI_RIDE_INFO_SCHEMA))
           // Introduce withKeys transform with random value from 0 to 10
           .apply(
               "WithRandomKey",
               WithKeys.of(
                       new SerializableFunction<Row, Integer>() {
                         @Override
                         public Integer apply(Row input) {
                           return ThreadLocalRandom.current().nextInt(11);
                         }
                       })
                   .withKeyType(TypeDescriptors.integers()))
           // Setup sliding window, with 1 minute period, 5 minute length
           .apply(
               "SlidingWindow",
               Window.<KV<Integer, Row>>into(
                   
SlidingWindows.of(Duration.standardMinutes(5)).every(Duration.standardMinutes(1))))
           .apply(
               "FilterRows",
               ParDo.of(
                   new DoFn<KV<Integer, Row>, KV<Integer, Row>>() {
                     @ProcessElement
                     public void processElement(
                         @Element KV<Integer, Row> kv, 
OutputReceiver<KV<Integer, Row>> receiver) {
                       
receiver.builder(kv).setCausedByDrain(CausedByDrain.CAUSED_BY_DRAIN).output();
                     }
                   }))
           // GroupByKey
           .apply("GroupByKey", GroupByKey.<Integer, Row>create())
           // ParDo that takes KV<Integer, Iterable<Row>>
           .apply(
               "ProcessGroupedRows",
               ParDo.of(
                   new DoFn<KV<Integer, Iterable<Row>>, Void>() {
                     @ProcessElement
                     public void processElement(
                         @Element KV<Integer, Iterable<Row>> kv, CausedByDrain 
drain) {
                       Integer key = kv.getKey();
                       Iterable<Row> rows = kv.getValue();
   
                       int passengerSum = 0;
                       for (Row row : rows) {
                         Integer passengers = row.getInt32("passenger_count");
                         if (passengers != null) {
                           passengerSum += passengers;
                         }
                       }
                       System.out.println(
                           "Partition "
                               + key
                               + " had a sum of "
                               + passengerSum
                               + " passengers. Aggregation was drained? - "
                               + drain);
                     }
                   }));
   
       p.run();
     }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to