This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 72fccfc7648 Fix protobuf build error in WindmillMap.persistDirect() 
for a removal in Dataflow Streaming Java Legacy Runner without Streaming Engine 
(#32893)
72fccfc7648 is described below

commit 72fccfc76489a1539f670d4143c77efa3329b601
Author: Minbo Bae <[email protected]>
AuthorDate: Thu Oct 24 00:44:19 2024 -0700

    Fix protobuf build error in WindmillMap.persistDirect() for a removal in 
Dataflow Streaming Java Legacy Runner without Streaming Engine (#32893)
    
    * Check WorkItemCommitRequest is buildable in WindmillStateInternalTest
---
 CHANGES.md                                         |  1 +
 .../worker/windmill/state/WindmillMap.java         |  7 +-
 .../windmill/state/WindmillStateInternalsTest.java | 86 ++++++++++++++++++++++
 3 files changed, 88 insertions(+), 6 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 6ed10f6c49d..f873455cd66 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -91,6 +91,7 @@
 
 * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
 * (Java) Fixed tearDown not invoked when DoFn throws on Portable Runners 
([#18592](https://github.com/apache/beam/issues/18592), 
[#31381](https://github.com/apache/beam/issues/31381)).
+* (Java) Fixed protobuf error with MapState.remove() in Dataflow Streaming 
Java Legacy Runner without Streaming Engine 
([#32892](https://github.com/apache/beam/issues/32892)).
 
 ## Security Fixes
 * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] 
(Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillMap.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillMap.java
index aed03f33e6d..b17631a8bd0 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillMap.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillMap.java
@@ -137,12 +137,7 @@ public class WindmillMap<K, V> extends 
AbstractWindmillMap<K, V> {
       keyCoder.encode(key, keyStream, Coder.Context.OUTER);
       ByteString keyBytes = keyStream.toByteString();
       // Leaving data blank means that we delete the tag.
-      commitBuilder
-          .addValueUpdatesBuilder()
-          .setTag(keyBytes)
-          .setStateFamily(stateFamily)
-          .getValueBuilder()
-          .setTimestamp(Long.MAX_VALUE);
+      
commitBuilder.addValueUpdatesBuilder().setTag(keyBytes).setStateFamily(stateFamily);
 
       V cachedValue = cachedValues.remove(key);
       if (cachedValue != null) {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
index d06ed0f526c..8d2623c382e 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
@@ -30,6 +30,8 @@ import static org.mockito.Mockito.when;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.nio.charset.StandardCharsets;
 import java.util.AbstractMap;
 import java.util.AbstractMap.SimpleEntry;
@@ -305,6 +307,26 @@ public class WindmillStateInternalsTest {
     return keyCoder.decode(keyBytes.newInput(), Context.OUTER);
   }
 
+  private static void assertBuildable(
+      Windmill.WorkItemCommitRequest.Builder commitWorkRequestBuilder) {
+    Windmill.WorkItemCommitRequest.Builder clone = 
commitWorkRequestBuilder.clone();
+    if (!clone.hasKey()) {
+      clone.setKey(ByteString.EMPTY); // key is required to build
+    }
+    if (!clone.hasWorkToken()) {
+      clone.setWorkToken(1357924680L); // workToken is required to build
+    }
+
+    try {
+      clone.build();
+    } catch (Exception e) {
+      StringWriter sw = new StringWriter();
+      e.printStackTrace(new PrintWriter(sw));
+      fail(
+          "Failed to build commitRequest from: " + commitWorkRequestBuilder + 
"\n" + sw.toString());
+    }
+  }
+
   @Test
   public void testMapAddBeforeGet() throws Exception {
     StateTag<MapState<String, Integer>> addr =
@@ -647,6 +669,8 @@ public class WindmillStateInternalsTest {
             .map(tv -> fromTagValue(tv, StringUtf8Coder.of(), 
VarIntCoder.of()))
             .collect(Collectors.toList()),
         Matchers.containsInAnyOrder(new SimpleEntry<>(tag1, 1), new 
SimpleEntry<>(tag2, 2)));
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -670,6 +694,8 @@ public class WindmillStateInternalsTest {
             .map(tv -> fromTagValue(tv, StringUtf8Coder.of(), 
VarIntCoder.of()))
             .collect(Collectors.toList()),
         Matchers.containsInAnyOrder(new SimpleEntry<>(tag1, null), new 
SimpleEntry<>(tag2, null)));
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -695,6 +721,8 @@ public class WindmillStateInternalsTest {
     assertEquals(
         protoKeyFromUserKey(null, StringUtf8Coder.of()),
         commitBuilder.getTagValuePrefixDeletes(0).getTagPrefix());
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -736,6 +764,8 @@ public class WindmillStateInternalsTest {
     commitBuilder = Windmill.WorkItemCommitRequest.newBuilder();
     assertEquals(0, commitBuilder.getTagValuePrefixDeletesCount());
     assertEquals(0, commitBuilder.getValueUpdatesCount());
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -953,6 +983,8 @@ public class WindmillStateInternalsTest {
 
     multimapState.put(key, 5);
     assertThat(multimapState.get(key).read(), Matchers.containsInAnyOrder(4, 
5));
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -1766,6 +1798,8 @@ public class WindmillStateInternalsTest {
         builder,
         new MultimapEntryUpdate(key1, Arrays.asList(1, 2), false),
         new MultimapEntryUpdate(key2, Collections.singletonList(2), false));
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -1799,6 +1833,8 @@ public class WindmillStateInternalsTest {
         builder,
         new MultimapEntryUpdate(key1, Arrays.asList(1, 2), true),
         new MultimapEntryUpdate(key2, Collections.singletonList(4), true));
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -1825,6 +1861,8 @@ public class WindmillStateInternalsTest {
         builder,
         new MultimapEntryUpdate(key1, Collections.emptyList(), true),
         new MultimapEntryUpdate(key2, Collections.emptyList(), true));
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -1856,6 +1894,8 @@ public class WindmillStateInternalsTest {
         
Iterables.getOnlyElement(commitBuilder.getMultimapUpdatesBuilderList());
     assertEquals(0, builder.getUpdatesCount());
     assertTrue(builder.getDeleteAll());
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -1894,6 +1934,8 @@ public class WindmillStateInternalsTest {
         
Iterables.getOnlyElement(commitBuilder.getMultimapUpdatesBuilderList());
     assertTagMultimapUpdates(
         builder, new MultimapEntryUpdate(key1, Collections.singletonList(4), 
false));
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -1938,6 +1980,8 @@ public class WindmillStateInternalsTest {
         ByteArrayCoder.of().decode(entryUpdate.getEntryName().newInput(), 
Context.OUTER);
     assertArrayEquals(key1, decodedKey);
     assertTrue(entryUpdate.getDeleteAll());
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -2053,6 +2097,8 @@ public class WindmillStateInternalsTest {
     Windmill.WorkItemCommitRequest.Builder commitBuilder =
         Windmill.WorkItemCommitRequest.newBuilder();
     underTest.persist(commitBuilder);
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -2253,6 +2299,8 @@ public class WindmillStateInternalsTest {
     assertEquals("hello", 
updates.getInserts(0).getEntries(0).getValue().toStringUtf8());
     assertEquals(1000, updates.getInserts(0).getEntries(0).getSortKey());
     assertEquals(IdTracker.NEW_RANGE_MIN_ID, 
updates.getInserts(0).getEntries(0).getId());
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -2284,6 +2332,8 @@ public class WindmillStateInternalsTest {
     assertEquals(IdTracker.NEW_RANGE_MIN_ID, 
updates.getInserts(0).getEntries(0).getId());
     assertEquals(IdTracker.NEW_RANGE_MIN_ID + 1, 
updates.getInserts(0).getEntries(1).getId());
     Mockito.verifyNoMoreInteractions(mockReader);
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -2331,6 +2381,8 @@ public class WindmillStateInternalsTest {
     assertEquals(4000, updates.getInserts(0).getEntries(1).getSortKey());
     assertEquals(IdTracker.NEW_RANGE_MIN_ID, 
updates.getInserts(0).getEntries(0).getId());
     assertEquals(IdTracker.NEW_RANGE_MIN_ID + 1, 
updates.getInserts(0).getEntries(1).getId());
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -2539,6 +2591,8 @@ public class WindmillStateInternalsTest {
     assertEquals(1, updates.getDeletesCount());
     assertEquals(WindmillOrderedList.MIN_TS_MICROS, 
updates.getDeletes(0).getRange().getStart());
     assertEquals(WindmillOrderedList.MAX_TS_MICROS, 
updates.getDeletes(0).getRange().getLimit());
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -2653,6 +2707,8 @@ public class WindmillStateInternalsTest {
     assertEquals("hello", bagUpdates.getValues(0).toStringUtf8());
 
     Mockito.verifyNoMoreInteractions(mockReader);
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -2678,6 +2734,8 @@ public class WindmillStateInternalsTest {
     assertEquals("world", tagBag.getValues(0).toStringUtf8());
 
     Mockito.verifyNoMoreInteractions(mockReader);
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -2693,6 +2751,8 @@ public class WindmillStateInternalsTest {
 
     // 1 bag update = the clear
     assertEquals(1, commitBuilder.getBagUpdatesCount());
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -2806,6 +2866,8 @@ public class WindmillStateInternalsTest {
         11, CoderUtils.decodeFromByteArray(accumCoder, 
bagUpdates.getValues(0).toByteArray())[0]);
 
     Mockito.verifyNoMoreInteractions(mockReader);
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -2835,6 +2897,8 @@ public class WindmillStateInternalsTest {
     assertTrue(bagUpdates.getDeleteAll());
     assertEquals(
         111, CoderUtils.decodeFromByteArray(accumCoder, 
bagUpdates.getValues(0).toByteArray())[0]);
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -2862,6 +2926,8 @@ public class WindmillStateInternalsTest {
         11, CoderUtils.decodeFromByteArray(accumCoder, 
tagBag.getValues(0).toByteArray())[0]);
 
     Mockito.verifyNoMoreInteractions(mockReader);
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -2990,6 +3056,8 @@ public class WindmillStateInternalsTest {
     assertEquals(TimeUnit.MILLISECONDS.toMicros(1000), 
watermarkHold.getTimestamps(0));
 
     Mockito.verifyNoMoreInteractions(mockReader);
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -3016,6 +3084,8 @@ public class WindmillStateInternalsTest {
 
     Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), 
STATE_FAMILY);
     Mockito.verifyNoMoreInteractions(mockReader);
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -3042,6 +3112,8 @@ public class WindmillStateInternalsTest {
 
     Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), 
STATE_FAMILY);
     Mockito.verifyNoMoreInteractions(mockReader);
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -3068,6 +3140,8 @@ public class WindmillStateInternalsTest {
 
     Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), 
STATE_FAMILY);
     Mockito.verifyNoMoreInteractions(mockReader);
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -3091,6 +3165,8 @@ public class WindmillStateInternalsTest {
 
     // Blind adds should not need to read the future.
     Mockito.verifyNoMoreInteractions(mockReader);
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -3116,6 +3192,8 @@ public class WindmillStateInternalsTest {
     assertEquals(TimeUnit.MILLISECONDS.toMicros(1000), 
clearAndUpdate.getTimestamps(0));
 
     Mockito.verifyNoMoreInteractions(mockReader);
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -3133,6 +3211,8 @@ public class WindmillStateInternalsTest {
 
     // 1 bag update corresponds to deletion. There shouldn't be a bag update 
adding items.
     assertEquals(1, commitBuilder.getWatermarkHoldsCount());
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -3200,6 +3280,8 @@ public class WindmillStateInternalsTest {
     assertTrue(valueUpdate.isInitialized());
 
     Mockito.verifyNoMoreInteractions(mockReader);
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -3220,6 +3302,8 @@ public class WindmillStateInternalsTest {
     assertEquals(0, valueUpdate.getValue().getData().size());
 
     Mockito.verifyNoMoreInteractions(mockReader);
+
+    assertBuildable(commitBuilder);
   }
 
   @Test
@@ -3234,6 +3318,8 @@ public class WindmillStateInternalsTest {
     assertEquals(0, commitBuilder.getValueUpdatesCount());
 
     Mockito.verifyNoMoreInteractions(mockReader);
+
+    assertBuildable(commitBuilder);
   }
 
   @Test

Reply via email to