robertwb commented on code in PR #28853:
URL: https://github.com/apache/beam/pull/28853#discussion_r1367531743
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java:
##########
@@ -81,28 +88,107 @@ public PCollection<KV<K, V>> expand(PCollection<KV<K, V>>
input) {
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
- return input
- .apply(rewindow)
- .apply("ReifyOriginalTimestamps", Reify.timestampsInValue())
- .apply(GroupByKey.create())
- // Set the windowing strategy directly, so that it doesn't get counted
as the user having
- // set allowed lateness.
- .setWindowingStrategyInternal(originalStrategy)
+ PCollection<KV<K, ValueInSingleWindow<V>>> reified =
+ input
+ .apply("SetIdentityWindow", rewindow)
+ .apply("ReifyOriginalMetadata", Reify.windowsInValue());
Review Comment:
Won't this break backwards compatibility for everything using a shuffle
(though I concede it's fixing a bug)?
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java:
##########
@@ -81,28 +88,107 @@ public PCollection<KV<K, V>> expand(PCollection<KV<K, V>>
input) {
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
- return input
- .apply(rewindow)
- .apply("ReifyOriginalTimestamps", Reify.timestampsInValue())
- .apply(GroupByKey.create())
- // Set the windowing strategy directly, so that it doesn't get counted
as the user having
- // set allowed lateness.
- .setWindowingStrategyInternal(originalStrategy)
+ PCollection<KV<K, ValueInSingleWindow<V>>> reified =
+ input
+ .apply("SetIdentityWindow", rewindow)
+ .apply("ReifyOriginalMetadata", Reify.windowsInValue());
+
+ PCollection<KV<K, Iterable<ValueInSingleWindow<V>>>> grouped =
+ reified.apply(GroupByKey.create());
+ return grouped
.apply(
"ExpandIterable",
ParDo.of(
- new DoFn<KV<K, Iterable<TimestampedValue<V>>>, KV<K,
TimestampedValue<V>>>() {
+ new DoFn<KV<K, Iterable<ValueInSingleWindow<V>>>, KV<K,
ValueInSingleWindow<V>>>() {
@ProcessElement
public void processElement(
- @Element KV<K, Iterable<TimestampedValue<V>>> element,
- OutputReceiver<KV<K, TimestampedValue<V>>> r) {
+ @Element KV<K, Iterable<ValueInSingleWindow<V>>> element,
+ OutputReceiver<KV<K, ValueInSingleWindow<V>>> r) {
K key = element.getKey();
- for (TimestampedValue<V> value : element.getValue()) {
+ for (ValueInSingleWindow<V> value : element.getValue()) {
r.output(KV.of(key, value));
}
}
}))
- .apply("RestoreOriginalTimestamps",
ReifyTimestamps.extractFromValues());
+ .apply(
+ "RestoreOriginalWindows",
+ Window.into(new
RestoreWindowsFn<>(originalStrategy.getWindowFn())))
+ .apply("RestoreOriginalTimestamps", new RestoreTimestamps<>())
+ // Set the windowing strategy directly, so that it doesn't get counted
as the user having
+ // set allowed lateness.
+ .setWindowingStrategyInternal(originalStrategy);
+ }
+
+ private static class RestoreWindowsFn<K, V, W extends BoundedWindow>
+ extends NonMergingWindowFn<KV<K, ValueInSingleWindow<V>>, W> {
+
+ private final WindowFn<?, W> originalWindowFn;
+
+ private RestoreWindowsFn(WindowFn<?, W> originalWindowFn) {
+ this.originalWindowFn = originalWindowFn;
+ }
+
+ @Override
+ public Collection<W> assignWindows(AssignContext c) throws Exception {
+ return Collections.singleton((W) c.element().getValue().getWindow());
+ }
+
+ @Override
+ public boolean isCompatible(WindowFn<?, ?> other) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "%s.isCompatible() should never be called."
+ + " It is a private implementation detail of sdk utilities."
+ + " This message indicates a bug in the Beam SDK.",
+ getClass().getCanonicalName()));
+ }
+
+ @Override
+ public void verifyCompatibility(WindowFn<?, ?> other) throws
IncompatibleWindowException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "%s.verifyCompatibility() should never be called."
+ + " It is a private implementation detail of sdk utilities."
+ + " This message indicates a bug in the Beam SDK.",
+ getClass().getCanonicalName()));
+ }
+
+ @Override
+ public Coder<W> windowCoder() {
+ return originalWindowFn.windowCoder();
+ }
+
+ @Override
+ public WindowMappingFn<W> getDefaultWindowMappingFn() {
+ throw new UnsupportedOperationException(
+ String.format(
+ "%s.getSideInputWindow() should never be called."
+ + " It is a private implementation detail of sdk utilities."
+ + " This message indicates a bug in the Beam SDK.",
+ getClass().getCanonicalName()));
+ }
+ }
+
+ private static class RestoreTimestamps<K, V>
+ extends PTransform<PCollection<KV<K, ValueInSingleWindow<V>>>,
PCollection<KV<K, V>>> {
+ @Override
+ public PCollection<KV<K, V>> expand(PCollection<KV<K,
ValueInSingleWindow<V>>> input) {
+ return input.apply(
+ ParDo.of(
+ new DoFn<KV<K, ValueInSingleWindow<V>>, KV<K, V>>() {
+ @Override
+ public Duration getAllowedTimestampSkew() {
+ return Duration.millis(Long.MAX_VALUE);
Review Comment:
The runner doesn't read/act on this value anywhere, does it?
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java:
##########
@@ -81,28 +88,107 @@ public PCollection<KV<K, V>> expand(PCollection<KV<K, V>>
input) {
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
- return input
- .apply(rewindow)
- .apply("ReifyOriginalTimestamps", Reify.timestampsInValue())
- .apply(GroupByKey.create())
- // Set the windowing strategy directly, so that it doesn't get counted
as the user having
- // set allowed lateness.
- .setWindowingStrategyInternal(originalStrategy)
+ PCollection<KV<K, ValueInSingleWindow<V>>> reified =
+ input
+ .apply("SetIdentityWindow", rewindow)
+ .apply("ReifyOriginalMetadata", Reify.windowsInValue());
+
+ PCollection<KV<K, Iterable<ValueInSingleWindow<V>>>> grouped =
+ reified.apply(GroupByKey.create());
+ return grouped
.apply(
"ExpandIterable",
ParDo.of(
- new DoFn<KV<K, Iterable<TimestampedValue<V>>>, KV<K,
TimestampedValue<V>>>() {
+ new DoFn<KV<K, Iterable<ValueInSingleWindow<V>>>, KV<K,
ValueInSingleWindow<V>>>() {
@ProcessElement
public void processElement(
- @Element KV<K, Iterable<TimestampedValue<V>>> element,
- OutputReceiver<KV<K, TimestampedValue<V>>> r) {
+ @Element KV<K, Iterable<ValueInSingleWindow<V>>> element,
+ OutputReceiver<KV<K, ValueInSingleWindow<V>>> r) {
K key = element.getKey();
- for (TimestampedValue<V> value : element.getValue()) {
+ for (ValueInSingleWindow<V> value : element.getValue()) {
r.output(KV.of(key, value));
}
}
}))
- .apply("RestoreOriginalTimestamps",
ReifyTimestamps.extractFromValues());
+ .apply(
+ "RestoreOriginalWindows",
+ Window.into(new
RestoreWindowsFn<>(originalStrategy.getWindowFn())))
+ .apply("RestoreOriginalTimestamps", new RestoreTimestamps<>())
Review Comment:
What about the PaneInfo information?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]