boyuanzz commented on a change in pull request #11941:
URL: https://github.com/apache/beam/pull/11941#discussion_r436861894



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -193,7 +193,21 @@
               bundleFinalizer);
 
       // Register the appropriate handlers.
-      startFunctionRegistry.register(pTransformId, runner::startBundle);
+      switch (pTransform.getSpec().getUrn()) {
+        case PTransformTranslation.PAR_DO_TRANSFORM_URN:
+        case PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN:
+        case 
PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN:
+          startFunctionRegistry.register(pTransformId, runner::startBundle);
+          break;
+        case PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN:

Review comment:
       I'm thinking whether it would be helpful to have `startBundle` for 
`PairWithRestriction` and `SplitRestriction`. Similar to process fn, 
`PairWithRestriction` and `SplitRestriction` also deal with (element, 
restriction). For example, I do this in `KafkaIO` to initialize consumer per 
bundle instead of per element. 

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -784,9 +812,8 @@ private ByteString encodeProgress(double value) throws 
IOException {
       default:
         // no-op
     }
-  }
 
-  private void startBundle() {
+    // TODO: Support caching state data across bundle boundaries.

Review comment:
       Could you please add a JIRA here, which elaborate this support with more 
details?

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -193,7 +193,21 @@
               bundleFinalizer);
 
       // Register the appropriate handlers.
-      startFunctionRegistry.register(pTransformId, runner::startBundle);
+      switch (pTransform.getSpec().getUrn()) {
+        case PTransformTranslation.PAR_DO_TRANSFORM_URN:
+        case PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN:
+        case 
PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN:
+          startFunctionRegistry.register(pTransformId, runner::startBundle);
+          break;
+        case PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN:

Review comment:
       I just got the idea of `setup` /`teardown`. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to