yelianevich opened a new issue, #33624:
URL: https://github.com/apache/beam/issues/33624
### What happened?
Hello everyone,
I’ve written a test that reproduces an issue with GroupIntoBatches when it’s
placed between two aggregations.
The test simulates a scenario where GroupIntoBatches is introduced between
aggregations using two different FixedWindow configurations. The difference
between the two windows lies in their settings: one uses accumulatingFiredPanes
while the other uses discardingFiredPanes.
Here’s what I’ve observed:
Without GroupIntoBatches, the fired pane from the first aggregation
immediately propagates to the second aggregation.
When GroupIntoBatches is introduced, the second aggregation only produces an
output after the second pane is fired from the first aggregation.
Is there a known workaround for this behavior? Could I be missing any
configuration or specific tuning to address this issue?
Thank you!
```java
@Test
void
whenGroupIntoBatchesBetweenAggregates_thenSecondAggregateOutputWithAdditionalDelayOfWindowDuration()
{
DataEntry one =
DataEntry.builder().entryId("counter").qualifier("one").build();
DataEntry two =
DataEntry.builder().entryId("counter").qualifier("two").build();
DataEntry three =
DataEntry.builder().entryId("counter").qualifier("three").build();
DataEntry four =
DataEntry.builder().entryId("counter").qualifier("four").build();
DataEntry five =
DataEntry.builder().entryId("counter").qualifier("five").build();
TestStream<DataEntry> createEvents =
TestStream.create(SerializableCoder.of(DataEntry.class))
.addElements(TimestampedValue.of(one, EPOCH))
.advanceWatermarkTo(EPOCH.plus(standardMinutes(1L)))
.addElements(TimestampedValue.of(two,
EPOCH.plus(standardSeconds(20L))))
.advanceWatermarkTo(EPOCH.plus(standardMinutes(5L)))
.addElements(TimestampedValue.of(three,
EPOCH.plus(standardSeconds(40L))))
.advanceWatermarkTo(EPOCH.plus(standardMinutes(8L)))
.addElements(TimestampedValue.of(four,
EPOCH.plus(standardSeconds(42L))))
.advanceWatermarkTo(EPOCH.plus(standardMinutes(10L)))
// arrived after allowed lateness - ignored
.addElements(TimestampedValue.of(five,
EPOCH.plus(standardSeconds(42L))))
.advanceWatermarkToInfinity();
PCollection<KV<String, Integer>> pCollection = pipeline
.apply("values", createEvents)
.apply(
Window.<DataEntry>into(FixedWindows.of(Duration.standardMinutes(5)))
.triggering(AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever()))
.withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
.withAllowedLateness(Duration.standardMinutes(5),
Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
.accumulatingFiredPanes() // the only
difference with the next window
)
.apply(ParDo.of(new LoggerDoFn("Input")))
// aggregate using the first window
.apply(WithKeys.of(DataEntry::getEntryId).withKeyType(strings()))
.apply(GroupByKey.create())
.apply(Values.create())
.apply(Flatten.iterables())
.apply(ParDo.of(new LoggerDoFn("After Accum Aggregate")))
// batch elements
.apply("DummyKey", WithKeys.of(""))
.apply(GroupIntoBatches.<String, DataEntry>ofSize(1)
.withMaxBufferingDuration(Duration.standardSeconds(1))
.withShardedKey())
.apply(Values.create())
.apply(Flatten.iterables())
.apply(
Window.<DataEntry>into(FixedWindows.of(Duration.standardMinutes(5)))
.triggering(AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever()))
.withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
.withAllowedLateness(Duration.standardMinutes(5),
Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
.discardingFiredPanes() // the only
difference with the previous window
)
.apply(ParDo.of(new LoggerDoFn("Discarding Grouping
Window")))
// aggregate using the second window
.apply(WithKeys.of(DataEntry::getEntryId).withKeyType(strings()))
.apply(Combine.perKey(new CountCombiner()))
.apply(ParDo.of(new KvLoggerDoFn("Final Combine")));
// expected
PAssert.that("On Time Pane", pCollection)
.inOnTimePane(boundedWindow)
.containsInAnyOrder(List.of(KV.of("counter", 2))); // but
contains only last KV.of("counter", 4)
PAssert.that("Late Pane", pCollection)
.inLatePane(boundedWindow)
.containsInAnyOrder(List.of(KV.of("counter", 4)));
PAssert.that("Full Window", pCollection)
.inWindow(boundedWindow)
.containsInAnyOrder(List.of(KV.of("counter", 2),
KV.of("counter", 4)));
pipeline.run().waitUntilFinish();
}
```
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [ ] Component: Python SDK
- [x] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Infrastructure
- [ ] Component: Spark Runner
- [x] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]