scwhittle commented on code in PR #37464:
URL: https://github.com/apache/beam/pull/37464#discussion_r2754538484
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -246,6 +241,10 @@ public void start(
this.sideInputStateFetcher = sideInputStateFetcher;
// Snapshot the limits for entire bundle processing.
this.operationalLimits =
globalConfigHandle.getConfig().operationalLimits();
Review Comment:
nit get the config just once? it's an atomic op
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java:
##########
@@ -406,4 +409,58 @@ public void stateSamplingInStreaming() {
sampler.stop();
}
}
+
+ @Test
+ public void testStateTagEncodingBasedOnConfig() {
+ for (Class<?> expectedEncoding :
+ Lists.newArrayList(WindmillTagEncodingV1.class,
WindmillTagEncodingV2.class)) {
+ CounterSet counterSet = new CounterSet();
+ ConcurrentHashMap<String, String> stateNameMap = new
ConcurrentHashMap<>();
+ StreamingGlobalConfigHandle globalConfigHandle =
+ new FixedGlobalConfigHandle(
+ StreamingGlobalConfig.builder()
+
.setEnableStateTagEncodingV2(WindmillTagEncodingV2.class.equals(expectedEncoding))
Review Comment:
nit: seems clearer to have test iterate over boolean isV2Encoding enabled.
And then just calculate the expected class below.
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java:
##########
@@ -406,4 +409,58 @@ public void stateSamplingInStreaming() {
sampler.stop();
}
}
+
+ @Test
+ public void testStateTagEncodingBasedOnConfig() {
+ for (Class<?> expectedEncoding :
+ Lists.newArrayList(WindmillTagEncodingV1.class,
WindmillTagEncodingV2.class)) {
+ CounterSet counterSet = new CounterSet();
+ ConcurrentHashMap<String, String> stateNameMap = new
ConcurrentHashMap<>();
+ StreamingGlobalConfigHandle globalConfigHandle =
+ new FixedGlobalConfigHandle(
+ StreamingGlobalConfig.builder()
+
.setEnableStateTagEncodingV2(WindmillTagEncodingV2.class.equals(expectedEncoding))
+ .build());
+ stateNameMap.put(NameContextsForTests.nameContextForTest().userName(),
"testStateFamily");
+ executionContext =
+ new StreamingModeExecutionContext(
+ counterSet,
+ COMPUTATION_ID,
+ new ReaderCache(Duration.standardMinutes(1),
Executors.newCachedThreadPool()),
+ stateNameMap,
+ WindmillStateCache.builder()
+ .setSizeMb(options.getWorkerCacheMb())
+ .build()
+ .forComputation("comp"),
+ StreamingStepMetricsContainer.createRegistry(),
+ new DataflowExecutionStateTracker(
+ ExecutionStateSampler.newForTest(),
+ executionStateRegistry.getState(
+ NameContext.forStage("stage"), "other", null,
NoopProfileScope.NOOP),
+ counterSet,
+ PipelineOptionsFactory.create(),
+ "test-work-item-id"),
+ executionStateRegistry,
+ globalConfigHandle,
+ Long.MAX_VALUE,
+ /*throwExceptionOnLargeOutput=*/ false);
+ Windmill.WorkItemCommitRequest.Builder outputBuilder =
+ Windmill.WorkItemCommitRequest.newBuilder();
+ NameContext nameContext = NameContextsForTests.nameContextForTest();
+ DataflowOperationContext operationContext =
+ executionContext.createOperationContext(nameContext);
+ StreamingModeExecutionContext.StepContext stepContext =
+ executionContext.getStepContext(operationContext);
+
+ executionContext.start(
Review Comment:
you could move the executionContext out of the loop and use
FakeGlobalConfigHandle to change the config to verify that the new value is
observed when start is called.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java:
##########
@@ -209,6 +210,10 @@ private StreamingGlobalConfig
createPipelineConfig(StreamingConfigTask config) {
pipelineConfig.setUserWorkerJobSettings(settings);
}
+ if (Objects.equals(2, config.getStreamingEngineStateTagEncodingVersion()))
{
Review Comment:
maybe we should throw an exception if it is greater than 2? That indicates
something went wrong and we could otherwise corrupt state
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]