boyuanzz commented on a change in pull request #11941: URL: https://github.com/apache/beam/pull/11941#discussion_r436861894
########## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ########## @@ -193,7 +193,21 @@ bundleFinalizer); // Register the appropriate handlers. - startFunctionRegistry.register(pTransformId, runner::startBundle); + switch (pTransform.getSpec().getUrn()) { + case PTransformTranslation.PAR_DO_TRANSFORM_URN: + case PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN: + case PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN: + startFunctionRegistry.register(pTransformId, runner::startBundle); + break; + case PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN: Review comment: I'm thinking whether it would be helpful to have `startBundle` for `PairWithRestriction` and `SplitRestriction`. Similar to process fn, `PairWithRestriction` and `SplitRestriction` also deal with (element, restriction). For example, I do this in `KafkaIO` to initialize consumer per bundle instead of per element. ########## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ########## @@ -784,9 +812,8 @@ private ByteString encodeProgress(double value) throws IOException { default: // no-op } - } - private void startBundle() { + // TODO: Support caching state data across bundle boundaries. Review comment: Could you please add a JIRA here, which elaborate this support with more details? ########## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ########## @@ -193,7 +193,21 @@ bundleFinalizer); // Register the appropriate handlers. - startFunctionRegistry.register(pTransformId, runner::startBundle); + switch (pTransform.getSpec().getUrn()) { + case PTransformTranslation.PAR_DO_TRANSFORM_URN: + case PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN: + case PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN: + startFunctionRegistry.register(pTransformId, runner::startBundle); + break; + case PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN: Review comment: I just got the idea of `setup` /`teardown`. Thanks! ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org