kennknowles commented on code in PR #28853:
URL: https://github.com/apache/beam/pull/28853#discussion_r1369156837
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java:
##########
@@ -81,28 +88,107 @@ public PCollection<KV<K, V>> expand(PCollection<KV<K, V>>
input) {
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
- return input
- .apply(rewindow)
- .apply("ReifyOriginalTimestamps", Reify.timestampsInValue())
- .apply(GroupByKey.create())
- // Set the windowing strategy directly, so that it doesn't get counted
as the user having
- // set allowed lateness.
- .setWindowingStrategyInternal(originalStrategy)
+ PCollection<KV<K, ValueInSingleWindow<V>>> reified =
+ input
+ .apply("SetIdentityWindow", rewindow)
+ .apply("ReifyOriginalMetadata", Reify.windowsInValue());
+
+ PCollection<KV<K, Iterable<ValueInSingleWindow<V>>>> grouped =
+ reified.apply(GroupByKey.create());
+ return grouped
.apply(
"ExpandIterable",
ParDo.of(
- new DoFn<KV<K, Iterable<TimestampedValue<V>>>, KV<K,
TimestampedValue<V>>>() {
+ new DoFn<KV<K, Iterable<ValueInSingleWindow<V>>>, KV<K,
ValueInSingleWindow<V>>>() {
@ProcessElement
public void processElement(
- @Element KV<K, Iterable<TimestampedValue<V>>> element,
- OutputReceiver<KV<K, TimestampedValue<V>>> r) {
+ @Element KV<K, Iterable<ValueInSingleWindow<V>>> element,
+ OutputReceiver<KV<K, ValueInSingleWindow<V>>> r) {
K key = element.getKey();
- for (TimestampedValue<V> value : element.getValue()) {
+ for (ValueInSingleWindow<V> value : element.getValue()) {
r.output(KV.of(key, value));
}
}
}))
- .apply("RestoreOriginalTimestamps",
ReifyTimestamps.extractFromValues());
+ .apply(
+ "RestoreOriginalWindows",
+ Window.into(new
RestoreWindowsFn<>(originalStrategy.getWindowFn())))
+ .apply("RestoreOriginalTimestamps", new RestoreTimestamps<>())
+ // Set the windowing strategy directly, so that it doesn't get counted
as the user having
+ // set allowed lateness.
+ .setWindowingStrategyInternal(originalStrategy);
+ }
+
+ private static class RestoreWindowsFn<K, V, W extends BoundedWindow>
+ extends NonMergingWindowFn<KV<K, ValueInSingleWindow<V>>, W> {
+
+ private final WindowFn<?, W> originalWindowFn;
+
+ private RestoreWindowsFn(WindowFn<?, W> originalWindowFn) {
+ this.originalWindowFn = originalWindowFn;
+ }
+
+ @Override
+ public Collection<W> assignWindows(AssignContext c) throws Exception {
+ return Collections.singleton((W) c.element().getValue().getWindow());
+ }
+
+ @Override
+ public boolean isCompatible(WindowFn<?, ?> other) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "%s.isCompatible() should never be called."
+ + " It is a private implementation detail of sdk utilities."
+ + " This message indicates a bug in the Beam SDK.",
+ getClass().getCanonicalName()));
+ }
+
+ @Override
+ public void verifyCompatibility(WindowFn<?, ?> other) throws
IncompatibleWindowException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "%s.verifyCompatibility() should never be called."
+ + " It is a private implementation detail of sdk utilities."
+ + " This message indicates a bug in the Beam SDK.",
+ getClass().getCanonicalName()));
+ }
+
+ @Override
+ public Coder<W> windowCoder() {
+ return originalWindowFn.windowCoder();
+ }
+
+ @Override
+ public WindowMappingFn<W> getDefaultWindowMappingFn() {
+ throw new UnsupportedOperationException(
+ String.format(
+ "%s.getSideInputWindow() should never be called."
+ + " It is a private implementation detail of sdk utilities."
+ + " This message indicates a bug in the Beam SDK.",
+ getClass().getCanonicalName()));
+ }
+ }
+
+ private static class RestoreTimestamps<K, V>
+ extends PTransform<PCollection<KV<K, ValueInSingleWindow<V>>>,
PCollection<KV<K, V>>> {
+ @Override
+ public PCollection<KV<K, V>> expand(PCollection<KV<K,
ValueInSingleWindow<V>>> input) {
+ return input.apply(
+ ParDo.of(
+ new DoFn<KV<K, ValueInSingleWindow<V>>, KV<K, V>>() {
+ @Override
+ public Duration getAllowedTimestampSkew() {
+ return Duration.millis(Long.MAX_VALUE);
Review Comment:
Noting that this is not a change but a move. But no the runner does not act
on it. In fact it is unsafe for this reason - the runner may well drop data
that you have allowed to be output.
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java:
##########
@@ -81,28 +88,107 @@ public PCollection<KV<K, V>> expand(PCollection<KV<K, V>>
input) {
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
- return input
- .apply(rewindow)
- .apply("ReifyOriginalTimestamps", Reify.timestampsInValue())
- .apply(GroupByKey.create())
- // Set the windowing strategy directly, so that it doesn't get counted
as the user having
- // set allowed lateness.
- .setWindowingStrategyInternal(originalStrategy)
+ PCollection<KV<K, ValueInSingleWindow<V>>> reified =
+ input
+ .apply("SetIdentityWindow", rewindow)
+ .apply("ReifyOriginalMetadata", Reify.windowsInValue());
Review Comment:
I had that same concern and was coming down on the side of "we should change
this anyhow". BUT looking into it, every single runner directly implements
reshuffle. So this is essentially just a fix for the reference implementation
and the default for runners that don't implement it yet (like Dataflow v2).
So I actually need a `ValidatesRunner` test which I presume most runner will
fail. But we don't have to worry about update incompatibility unless/until
those runners come into compliance. I still think this change should happen.
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java:
##########
@@ -81,28 +88,107 @@ public PCollection<KV<K, V>> expand(PCollection<KV<K, V>>
input) {
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
- return input
- .apply(rewindow)
- .apply("ReifyOriginalTimestamps", Reify.timestampsInValue())
- .apply(GroupByKey.create())
- // Set the windowing strategy directly, so that it doesn't get counted
as the user having
- // set allowed lateness.
- .setWindowingStrategyInternal(originalStrategy)
+ PCollection<KV<K, ValueInSingleWindow<V>>> reified =
+ input
+ .apply("SetIdentityWindow", rewindow)
+ .apply("ReifyOriginalMetadata", Reify.windowsInValue());
+
+ PCollection<KV<K, Iterable<ValueInSingleWindow<V>>>> grouped =
+ reified.apply(GroupByKey.create());
+ return grouped
.apply(
"ExpandIterable",
ParDo.of(
- new DoFn<KV<K, Iterable<TimestampedValue<V>>>, KV<K,
TimestampedValue<V>>>() {
+ new DoFn<KV<K, Iterable<ValueInSingleWindow<V>>>, KV<K,
ValueInSingleWindow<V>>>() {
@ProcessElement
public void processElement(
- @Element KV<K, Iterable<TimestampedValue<V>>> element,
- OutputReceiver<KV<K, TimestampedValue<V>>> r) {
+ @Element KV<K, Iterable<ValueInSingleWindow<V>>> element,
+ OutputReceiver<KV<K, ValueInSingleWindow<V>>> r) {
K key = element.getKey();
- for (TimestampedValue<V> value : element.getValue()) {
+ for (ValueInSingleWindow<V> value : element.getValue()) {
r.output(KV.of(key, value));
}
}
}))
- .apply("RestoreOriginalTimestamps",
ReifyTimestamps.extractFromValues());
+ .apply(
+ "RestoreOriginalWindows",
+ Window.into(new
RestoreWindowsFn<>(originalStrategy.getWindowFn())))
+ .apply("RestoreOriginalTimestamps", new RestoreTimestamps<>())
Review Comment:
Ha of course. The whole point.
I got caught up in the fact that I could not restore windows and timestamps
in a single ParDo and had to do a rewindowing. But in fact I don't think there
is a way to restore the paneinfo without changing the model after all to at
least allow direct manipulation of it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]