kennknowles commented on code in PR #36373:
URL: https://github.com/apache/beam/pull/36373#discussion_r2402427919


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java:
##########
@@ -75,11 +78,20 @@ private static ByteString encodeMetadata(
       ByteStringOutputStream stream,
       Coder<Collection<? extends BoundedWindow>> windowsCoder,
       Collection<? extends BoundedWindow> windows,
-      PaneInfo paneInfo)
+      PaneInfo paneInfo,
+      BeamFnApi.Elements.ElementMetadata metadata)
       throws IOException {
     try {
-      PaneInfoCoder.INSTANCE.encode(paneInfo, stream);
-      windowsCoder.encode(windows, stream, Coder.Context.OUTER);
+      // element metadata is behind the experiment
+      boolean elementMetadata = 
WindowedValues.WindowedValueCoder.isMetadataSupported();
+      if (elementMetadata) {
+        PaneInfoCoder.INSTANCE.encode(paneInfo, stream);

Review Comment:
   IIRC in the design there would be a bit in the PaneInfo that indicates if 
there is any metadata in the stream, right? Or I guess that is only for the 
coder and we always put it in the WindmillSink/WindmillSource exchange?



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java:
##########
@@ -77,6 +77,32 @@ public void testWindowedValueCoder() throws CoderException {
     Assert.assertArrayEquals(value.getWindows().toArray(), 
decodedValue.getWindows().toArray());
   }
 
+  @Test
+  public void testWindowedValueWithElementMetadataCoder() throws 
CoderException {
+    WindowedValues.WindowedValueCoder.setMetadataSupported();
+    Instant timestamp = new Instant(1234);
+    WindowedValue<String> value =
+        WindowedValues.of(
+            "abc",
+            new Instant(1234),
+            Arrays.asList(
+                new IntervalWindow(timestamp, 
timestamp.plus(Duration.millis(1000))),
+                new IntervalWindow(
+                    timestamp.plus(Duration.millis(1000)), 
timestamp.plus(Duration.millis(2000)))),
+            PaneInfo.NO_FIRING);
+
+    Coder<WindowedValue<String>> windowedValueCoder =
+        WindowedValues.getFullCoder(StringUtf8Coder.of(), 
IntervalWindow.getCoder());
+
+    byte[] encodedValue = CoderUtils.encodeToByteArray(windowedValueCoder, 
value);
+    WindowedValue<String> decodedValue =
+        CoderUtils.decodeFromByteArray(windowedValueCoder, encodedValue);
+
+    Assert.assertEquals(value.getValue(), decodedValue.getValue());
+    Assert.assertEquals(value.getTimestamp(), decodedValue.getTimestamp());
+    Assert.assertArrayEquals(value.getWindows().toArray(), 
decodedValue.getWindows().toArray());

Review Comment:
   This doesn't actually have any extended metadata, right?



##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/RedistributeByKeyOverrideFactory.java:
##########
@@ -135,6 +135,7 @@ public Duration getAllowedTimestampSkew() {
                 public void processElement(
                     @Element KV<K, ValueInSingleWindow<V>> kv,
                     OutputReceiver<KV<K, V>> outputReceiver) {
+                  // todo #33176 specify additional metadata in the future

Review Comment:
   Yea I think we'll want to find a way to update Redistribute to automatically 
carry along "all" metadata. We built a separate data type for Redistribute to 
avoid making WindowedValue public. But now that it is public and everything 
around it is public (or public enough) we should make it possible to be 
resilient to new metadata being added without having to edit Redistribute or 
the overrides.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to