This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 72fccfc7648 Fix protobuf build error in WindmillMap.persistDirect()
for a removal in Dataflow Streaming Java Legacy Runner without Streaming Engine
(#32893)
72fccfc7648 is described below
commit 72fccfc76489a1539f670d4143c77efa3329b601
Author: Minbo Bae <[email protected]>
AuthorDate: Thu Oct 24 00:44:19 2024 -0700
Fix protobuf build error in WindmillMap.persistDirect() for a removal in
Dataflow Streaming Java Legacy Runner without Streaming Engine (#32893)
* Check WorkItemCommitRequest is buildable in WindmillStateInternalTest
---
CHANGES.md | 1 +
.../worker/windmill/state/WindmillMap.java | 7 +-
.../windmill/state/WindmillStateInternalsTest.java | 86 ++++++++++++++++++++++
3 files changed, 88 insertions(+), 6 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 6ed10f6c49d..f873455cd66 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -91,6 +91,7 @@
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* (Java) Fixed tearDown not invoked when DoFn throws on Portable Runners
([#18592](https://github.com/apache/beam/issues/18592),
[#31381](https://github.com/apache/beam/issues/31381)).
+* (Java) Fixed protobuf error with MapState.remove() in Dataflow Streaming
Java Legacy Runner without Streaming Engine
([#32892](https://github.com/apache/beam/issues/32892)).
## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN]
(Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillMap.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillMap.java
index aed03f33e6d..b17631a8bd0 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillMap.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillMap.java
@@ -137,12 +137,7 @@ public class WindmillMap<K, V> extends
AbstractWindmillMap<K, V> {
keyCoder.encode(key, keyStream, Coder.Context.OUTER);
ByteString keyBytes = keyStream.toByteString();
// Leaving data blank means that we delete the tag.
- commitBuilder
- .addValueUpdatesBuilder()
- .setTag(keyBytes)
- .setStateFamily(stateFamily)
- .getValueBuilder()
- .setTimestamp(Long.MAX_VALUE);
+
commitBuilder.addValueUpdatesBuilder().setTag(keyBytes).setStateFamily(stateFamily);
V cachedValue = cachedValues.remove(key);
if (cachedValue != null) {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
index d06ed0f526c..8d2623c382e 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
@@ -30,6 +30,8 @@ import static org.mockito.Mockito.when;
import java.io.Closeable;
import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.AbstractMap;
import java.util.AbstractMap.SimpleEntry;
@@ -305,6 +307,26 @@ public class WindmillStateInternalsTest {
return keyCoder.decode(keyBytes.newInput(), Context.OUTER);
}
+ private static void assertBuildable(
+ Windmill.WorkItemCommitRequest.Builder commitWorkRequestBuilder) {
+ Windmill.WorkItemCommitRequest.Builder clone =
commitWorkRequestBuilder.clone();
+ if (!clone.hasKey()) {
+ clone.setKey(ByteString.EMPTY); // key is required to build
+ }
+ if (!clone.hasWorkToken()) {
+ clone.setWorkToken(1357924680L); // workToken is required to build
+ }
+
+ try {
+ clone.build();
+ } catch (Exception e) {
+ StringWriter sw = new StringWriter();
+ e.printStackTrace(new PrintWriter(sw));
+ fail(
+ "Failed to build commitRequest from: " + commitWorkRequestBuilder +
"\n" + sw.toString());
+ }
+ }
+
@Test
public void testMapAddBeforeGet() throws Exception {
StateTag<MapState<String, Integer>> addr =
@@ -647,6 +669,8 @@ public class WindmillStateInternalsTest {
.map(tv -> fromTagValue(tv, StringUtf8Coder.of(),
VarIntCoder.of()))
.collect(Collectors.toList()),
Matchers.containsInAnyOrder(new SimpleEntry<>(tag1, 1), new
SimpleEntry<>(tag2, 2)));
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -670,6 +694,8 @@ public class WindmillStateInternalsTest {
.map(tv -> fromTagValue(tv, StringUtf8Coder.of(),
VarIntCoder.of()))
.collect(Collectors.toList()),
Matchers.containsInAnyOrder(new SimpleEntry<>(tag1, null), new
SimpleEntry<>(tag2, null)));
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -695,6 +721,8 @@ public class WindmillStateInternalsTest {
assertEquals(
protoKeyFromUserKey(null, StringUtf8Coder.of()),
commitBuilder.getTagValuePrefixDeletes(0).getTagPrefix());
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -736,6 +764,8 @@ public class WindmillStateInternalsTest {
commitBuilder = Windmill.WorkItemCommitRequest.newBuilder();
assertEquals(0, commitBuilder.getTagValuePrefixDeletesCount());
assertEquals(0, commitBuilder.getValueUpdatesCount());
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -953,6 +983,8 @@ public class WindmillStateInternalsTest {
multimapState.put(key, 5);
assertThat(multimapState.get(key).read(), Matchers.containsInAnyOrder(4,
5));
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -1766,6 +1798,8 @@ public class WindmillStateInternalsTest {
builder,
new MultimapEntryUpdate(key1, Arrays.asList(1, 2), false),
new MultimapEntryUpdate(key2, Collections.singletonList(2), false));
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -1799,6 +1833,8 @@ public class WindmillStateInternalsTest {
builder,
new MultimapEntryUpdate(key1, Arrays.asList(1, 2), true),
new MultimapEntryUpdate(key2, Collections.singletonList(4), true));
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -1825,6 +1861,8 @@ public class WindmillStateInternalsTest {
builder,
new MultimapEntryUpdate(key1, Collections.emptyList(), true),
new MultimapEntryUpdate(key2, Collections.emptyList(), true));
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -1856,6 +1894,8 @@ public class WindmillStateInternalsTest {
Iterables.getOnlyElement(commitBuilder.getMultimapUpdatesBuilderList());
assertEquals(0, builder.getUpdatesCount());
assertTrue(builder.getDeleteAll());
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -1894,6 +1934,8 @@ public class WindmillStateInternalsTest {
Iterables.getOnlyElement(commitBuilder.getMultimapUpdatesBuilderList());
assertTagMultimapUpdates(
builder, new MultimapEntryUpdate(key1, Collections.singletonList(4),
false));
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -1938,6 +1980,8 @@ public class WindmillStateInternalsTest {
ByteArrayCoder.of().decode(entryUpdate.getEntryName().newInput(),
Context.OUTER);
assertArrayEquals(key1, decodedKey);
assertTrue(entryUpdate.getDeleteAll());
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -2053,6 +2097,8 @@ public class WindmillStateInternalsTest {
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -2253,6 +2299,8 @@ public class WindmillStateInternalsTest {
assertEquals("hello",
updates.getInserts(0).getEntries(0).getValue().toStringUtf8());
assertEquals(1000, updates.getInserts(0).getEntries(0).getSortKey());
assertEquals(IdTracker.NEW_RANGE_MIN_ID,
updates.getInserts(0).getEntries(0).getId());
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -2284,6 +2332,8 @@ public class WindmillStateInternalsTest {
assertEquals(IdTracker.NEW_RANGE_MIN_ID,
updates.getInserts(0).getEntries(0).getId());
assertEquals(IdTracker.NEW_RANGE_MIN_ID + 1,
updates.getInserts(0).getEntries(1).getId());
Mockito.verifyNoMoreInteractions(mockReader);
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -2331,6 +2381,8 @@ public class WindmillStateInternalsTest {
assertEquals(4000, updates.getInserts(0).getEntries(1).getSortKey());
assertEquals(IdTracker.NEW_RANGE_MIN_ID,
updates.getInserts(0).getEntries(0).getId());
assertEquals(IdTracker.NEW_RANGE_MIN_ID + 1,
updates.getInserts(0).getEntries(1).getId());
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -2539,6 +2591,8 @@ public class WindmillStateInternalsTest {
assertEquals(1, updates.getDeletesCount());
assertEquals(WindmillOrderedList.MIN_TS_MICROS,
updates.getDeletes(0).getRange().getStart());
assertEquals(WindmillOrderedList.MAX_TS_MICROS,
updates.getDeletes(0).getRange().getLimit());
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -2653,6 +2707,8 @@ public class WindmillStateInternalsTest {
assertEquals("hello", bagUpdates.getValues(0).toStringUtf8());
Mockito.verifyNoMoreInteractions(mockReader);
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -2678,6 +2734,8 @@ public class WindmillStateInternalsTest {
assertEquals("world", tagBag.getValues(0).toStringUtf8());
Mockito.verifyNoMoreInteractions(mockReader);
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -2693,6 +2751,8 @@ public class WindmillStateInternalsTest {
// 1 bag update = the clear
assertEquals(1, commitBuilder.getBagUpdatesCount());
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -2806,6 +2866,8 @@ public class WindmillStateInternalsTest {
11, CoderUtils.decodeFromByteArray(accumCoder,
bagUpdates.getValues(0).toByteArray())[0]);
Mockito.verifyNoMoreInteractions(mockReader);
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -2835,6 +2897,8 @@ public class WindmillStateInternalsTest {
assertTrue(bagUpdates.getDeleteAll());
assertEquals(
111, CoderUtils.decodeFromByteArray(accumCoder,
bagUpdates.getValues(0).toByteArray())[0]);
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -2862,6 +2926,8 @@ public class WindmillStateInternalsTest {
11, CoderUtils.decodeFromByteArray(accumCoder,
tagBag.getValues(0).toByteArray())[0]);
Mockito.verifyNoMoreInteractions(mockReader);
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -2990,6 +3056,8 @@ public class WindmillStateInternalsTest {
assertEquals(TimeUnit.MILLISECONDS.toMicros(1000),
watermarkHold.getTimestamps(0));
Mockito.verifyNoMoreInteractions(mockReader);
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -3016,6 +3084,8 @@ public class WindmillStateInternalsTest {
Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"),
STATE_FAMILY);
Mockito.verifyNoMoreInteractions(mockReader);
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -3042,6 +3112,8 @@ public class WindmillStateInternalsTest {
Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"),
STATE_FAMILY);
Mockito.verifyNoMoreInteractions(mockReader);
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -3068,6 +3140,8 @@ public class WindmillStateInternalsTest {
Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"),
STATE_FAMILY);
Mockito.verifyNoMoreInteractions(mockReader);
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -3091,6 +3165,8 @@ public class WindmillStateInternalsTest {
// Blind adds should not need to read the future.
Mockito.verifyNoMoreInteractions(mockReader);
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -3116,6 +3192,8 @@ public class WindmillStateInternalsTest {
assertEquals(TimeUnit.MILLISECONDS.toMicros(1000),
clearAndUpdate.getTimestamps(0));
Mockito.verifyNoMoreInteractions(mockReader);
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -3133,6 +3211,8 @@ public class WindmillStateInternalsTest {
// 1 bag update corresponds to deletion. There shouldn't be a bag update
adding items.
assertEquals(1, commitBuilder.getWatermarkHoldsCount());
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -3200,6 +3280,8 @@ public class WindmillStateInternalsTest {
assertTrue(valueUpdate.isInitialized());
Mockito.verifyNoMoreInteractions(mockReader);
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -3220,6 +3302,8 @@ public class WindmillStateInternalsTest {
assertEquals(0, valueUpdate.getValue().getData().size());
Mockito.verifyNoMoreInteractions(mockReader);
+
+ assertBuildable(commitBuilder);
}
@Test
@@ -3234,6 +3318,8 @@ public class WindmillStateInternalsTest {
assertEquals(0, commitBuilder.getValueUpdatesCount());
Mockito.verifyNoMoreInteractions(mockReader);
+
+ assertBuildable(commitBuilder);
}
@Test