scwhittle commented on code in PR #37464:
URL: https://github.com/apache/beam/pull/37464#discussion_r2754538484


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -246,6 +241,10 @@ public void start(
     this.sideInputStateFetcher = sideInputStateFetcher;
     // Snapshot the limits for entire bundle processing.
     this.operationalLimits = 
globalConfigHandle.getConfig().operationalLimits();

Review Comment:
   nit get the config just once? it's an atomic op



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java:
##########
@@ -406,4 +409,58 @@ public void stateSamplingInStreaming() {
       sampler.stop();
     }
   }
+
+  @Test
+  public void testStateTagEncodingBasedOnConfig() {
+    for (Class<?> expectedEncoding :
+        Lists.newArrayList(WindmillTagEncodingV1.class, 
WindmillTagEncodingV2.class)) {
+      CounterSet counterSet = new CounterSet();
+      ConcurrentHashMap<String, String> stateNameMap = new 
ConcurrentHashMap<>();
+      StreamingGlobalConfigHandle globalConfigHandle =
+          new FixedGlobalConfigHandle(
+              StreamingGlobalConfig.builder()
+                  
.setEnableStateTagEncodingV2(WindmillTagEncodingV2.class.equals(expectedEncoding))

Review Comment:
   nit: seems clearer to have test iterate over boolean isV2Encoding enabled.  
And then just calculate the expected class below.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java:
##########
@@ -406,4 +409,58 @@ public void stateSamplingInStreaming() {
       sampler.stop();
     }
   }
+
+  @Test
+  public void testStateTagEncodingBasedOnConfig() {
+    for (Class<?> expectedEncoding :
+        Lists.newArrayList(WindmillTagEncodingV1.class, 
WindmillTagEncodingV2.class)) {
+      CounterSet counterSet = new CounterSet();
+      ConcurrentHashMap<String, String> stateNameMap = new 
ConcurrentHashMap<>();
+      StreamingGlobalConfigHandle globalConfigHandle =
+          new FixedGlobalConfigHandle(
+              StreamingGlobalConfig.builder()
+                  
.setEnableStateTagEncodingV2(WindmillTagEncodingV2.class.equals(expectedEncoding))
+                  .build());
+      stateNameMap.put(NameContextsForTests.nameContextForTest().userName(), 
"testStateFamily");
+      executionContext =
+          new StreamingModeExecutionContext(
+              counterSet,
+              COMPUTATION_ID,
+              new ReaderCache(Duration.standardMinutes(1), 
Executors.newCachedThreadPool()),
+              stateNameMap,
+              WindmillStateCache.builder()
+                  .setSizeMb(options.getWorkerCacheMb())
+                  .build()
+                  .forComputation("comp"),
+              StreamingStepMetricsContainer.createRegistry(),
+              new DataflowExecutionStateTracker(
+                  ExecutionStateSampler.newForTest(),
+                  executionStateRegistry.getState(
+                      NameContext.forStage("stage"), "other", null, 
NoopProfileScope.NOOP),
+                  counterSet,
+                  PipelineOptionsFactory.create(),
+                  "test-work-item-id"),
+              executionStateRegistry,
+              globalConfigHandle,
+              Long.MAX_VALUE,
+              /*throwExceptionOnLargeOutput=*/ false);
+      Windmill.WorkItemCommitRequest.Builder outputBuilder =
+          Windmill.WorkItemCommitRequest.newBuilder();
+      NameContext nameContext = NameContextsForTests.nameContextForTest();
+      DataflowOperationContext operationContext =
+          executionContext.createOperationContext(nameContext);
+      StreamingModeExecutionContext.StepContext stepContext =
+          executionContext.getStepContext(operationContext);
+
+      executionContext.start(

Review Comment:
   you could move the executionContext out of the loop and use 
FakeGlobalConfigHandle to change the config to verify that the new value is 
observed when start is called.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java:
##########
@@ -209,6 +210,10 @@ private StreamingGlobalConfig 
createPipelineConfig(StreamingConfigTask config) {
       pipelineConfig.setUserWorkerJobSettings(settings);
     }
 
+    if (Objects.equals(2, config.getStreamingEngineStateTagEncodingVersion())) 
{

Review Comment:
   maybe we should throw an exception if it is greater than 2? That indicates 
something went wrong and we could otherwise corrupt state



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to