kennknowles commented on code in PR #28853:
URL: https://github.com/apache/beam/pull/28853#discussion_r1369156837


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java:
##########
@@ -81,28 +88,107 @@ public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> 
input) {
             .withTimestampCombiner(TimestampCombiner.EARLIEST)
             
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
 
-    return input
-        .apply(rewindow)
-        .apply("ReifyOriginalTimestamps", Reify.timestampsInValue())
-        .apply(GroupByKey.create())
-        // Set the windowing strategy directly, so that it doesn't get counted 
as the user having
-        // set allowed lateness.
-        .setWindowingStrategyInternal(originalStrategy)
+    PCollection<KV<K, ValueInSingleWindow<V>>> reified =
+        input
+            .apply("SetIdentityWindow", rewindow)
+            .apply("ReifyOriginalMetadata", Reify.windowsInValue());
+
+    PCollection<KV<K, Iterable<ValueInSingleWindow<V>>>> grouped =
+        reified.apply(GroupByKey.create());
+    return grouped
         .apply(
             "ExpandIterable",
             ParDo.of(
-                new DoFn<KV<K, Iterable<TimestampedValue<V>>>, KV<K, 
TimestampedValue<V>>>() {
+                new DoFn<KV<K, Iterable<ValueInSingleWindow<V>>>, KV<K, 
ValueInSingleWindow<V>>>() {
                   @ProcessElement
                   public void processElement(
-                      @Element KV<K, Iterable<TimestampedValue<V>>> element,
-                      OutputReceiver<KV<K, TimestampedValue<V>>> r) {
+                      @Element KV<K, Iterable<ValueInSingleWindow<V>>> element,
+                      OutputReceiver<KV<K, ValueInSingleWindow<V>>> r) {
                     K key = element.getKey();
-                    for (TimestampedValue<V> value : element.getValue()) {
+                    for (ValueInSingleWindow<V> value : element.getValue()) {
                       r.output(KV.of(key, value));
                     }
                   }
                 }))
-        .apply("RestoreOriginalTimestamps", 
ReifyTimestamps.extractFromValues());
+        .apply(
+            "RestoreOriginalWindows",
+            Window.into(new 
RestoreWindowsFn<>(originalStrategy.getWindowFn())))
+        .apply("RestoreOriginalTimestamps", new RestoreTimestamps<>())
+        // Set the windowing strategy directly, so that it doesn't get counted 
as the user having
+        // set allowed lateness.
+        .setWindowingStrategyInternal(originalStrategy);
+  }
+
+  private static class RestoreWindowsFn<K, V, W extends BoundedWindow>
+      extends NonMergingWindowFn<KV<K, ValueInSingleWindow<V>>, W> {
+
+    private final WindowFn<?, W> originalWindowFn;
+
+    private RestoreWindowsFn(WindowFn<?, W> originalWindowFn) {
+      this.originalWindowFn = originalWindowFn;
+    }
+
+    @Override
+    public Collection<W> assignWindows(AssignContext c) throws Exception {
+      return Collections.singleton((W) c.element().getValue().getWindow());
+    }
+
+    @Override
+    public boolean isCompatible(WindowFn<?, ?> other) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "%s.isCompatible() should never be called."
+                  + " It is a private implementation detail of sdk utilities."
+                  + " This message indicates a bug in the Beam SDK.",
+              getClass().getCanonicalName()));
+    }
+
+    @Override
+    public void verifyCompatibility(WindowFn<?, ?> other) throws 
IncompatibleWindowException {
+      throw new UnsupportedOperationException(
+          String.format(
+              "%s.verifyCompatibility() should never be called."
+                  + " It is a private implementation detail of sdk utilities."
+                  + " This message indicates a bug in the Beam SDK.",
+              getClass().getCanonicalName()));
+    }
+
+    @Override
+    public Coder<W> windowCoder() {
+      return originalWindowFn.windowCoder();
+    }
+
+    @Override
+    public WindowMappingFn<W> getDefaultWindowMappingFn() {
+      throw new UnsupportedOperationException(
+          String.format(
+              "%s.getSideInputWindow() should never be called."
+                  + " It is a private implementation detail of sdk utilities."
+                  + " This message indicates a bug in the Beam SDK.",
+              getClass().getCanonicalName()));
+    }
+  }
+
+  private static class RestoreTimestamps<K, V>
+      extends PTransform<PCollection<KV<K, ValueInSingleWindow<V>>>, 
PCollection<KV<K, V>>> {
+    @Override
+    public PCollection<KV<K, V>> expand(PCollection<KV<K, 
ValueInSingleWindow<V>>> input) {
+      return input.apply(
+          ParDo.of(
+              new DoFn<KV<K, ValueInSingleWindow<V>>, KV<K, V>>() {
+                @Override
+                public Duration getAllowedTimestampSkew() {
+                  return Duration.millis(Long.MAX_VALUE);

Review Comment:
   Noting that this is not a change but a move. But no the runner does not act 
on it. In fact it is unsafe for this reason - the runner may well drop data 
that you have allowed to be output.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java:
##########
@@ -81,28 +88,107 @@ public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> 
input) {
             .withTimestampCombiner(TimestampCombiner.EARLIEST)
             
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
 
-    return input
-        .apply(rewindow)
-        .apply("ReifyOriginalTimestamps", Reify.timestampsInValue())
-        .apply(GroupByKey.create())
-        // Set the windowing strategy directly, so that it doesn't get counted 
as the user having
-        // set allowed lateness.
-        .setWindowingStrategyInternal(originalStrategy)
+    PCollection<KV<K, ValueInSingleWindow<V>>> reified =
+        input
+            .apply("SetIdentityWindow", rewindow)
+            .apply("ReifyOriginalMetadata", Reify.windowsInValue());

Review Comment:
   I had that same concern and was coming down on the side of "we should change 
this anyhow". BUT looking into it, every single runner directly implements 
reshuffle. So this is essentially just a fix for the reference implementation 
and the default for runners that don't implement it yet (like Dataflow v2).
   
   So I actually need a `ValidatesRunner` test which I presume most runner will 
fail. But we don't have to worry about update incompatibility unless/until 
those runners come into compliance. I still think this change should happen.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java:
##########
@@ -81,28 +88,107 @@ public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> 
input) {
             .withTimestampCombiner(TimestampCombiner.EARLIEST)
             
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
 
-    return input
-        .apply(rewindow)
-        .apply("ReifyOriginalTimestamps", Reify.timestampsInValue())
-        .apply(GroupByKey.create())
-        // Set the windowing strategy directly, so that it doesn't get counted 
as the user having
-        // set allowed lateness.
-        .setWindowingStrategyInternal(originalStrategy)
+    PCollection<KV<K, ValueInSingleWindow<V>>> reified =
+        input
+            .apply("SetIdentityWindow", rewindow)
+            .apply("ReifyOriginalMetadata", Reify.windowsInValue());
+
+    PCollection<KV<K, Iterable<ValueInSingleWindow<V>>>> grouped =
+        reified.apply(GroupByKey.create());
+    return grouped
         .apply(
             "ExpandIterable",
             ParDo.of(
-                new DoFn<KV<K, Iterable<TimestampedValue<V>>>, KV<K, 
TimestampedValue<V>>>() {
+                new DoFn<KV<K, Iterable<ValueInSingleWindow<V>>>, KV<K, 
ValueInSingleWindow<V>>>() {
                   @ProcessElement
                   public void processElement(
-                      @Element KV<K, Iterable<TimestampedValue<V>>> element,
-                      OutputReceiver<KV<K, TimestampedValue<V>>> r) {
+                      @Element KV<K, Iterable<ValueInSingleWindow<V>>> element,
+                      OutputReceiver<KV<K, ValueInSingleWindow<V>>> r) {
                     K key = element.getKey();
-                    for (TimestampedValue<V> value : element.getValue()) {
+                    for (ValueInSingleWindow<V> value : element.getValue()) {
                       r.output(KV.of(key, value));
                     }
                   }
                 }))
-        .apply("RestoreOriginalTimestamps", 
ReifyTimestamps.extractFromValues());
+        .apply(
+            "RestoreOriginalWindows",
+            Window.into(new 
RestoreWindowsFn<>(originalStrategy.getWindowFn())))
+        .apply("RestoreOriginalTimestamps", new RestoreTimestamps<>())

Review Comment:
   Ha of course. The whole point.
   
   I got caught up in the fact that I could not restore windows and timestamps 
in a single ParDo and had to do a rewindowing. But in fact I don't think there 
is a way to restore the paneinfo without changing the model after all to at 
least allow direct manipulation of it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to