lukecwik commented on a change in pull request #11941: URL: https://github.com/apache/beam/pull/11941#discussion_r436891532
########## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ########## @@ -193,7 +193,21 @@ bundleFinalizer); // Register the appropriate handlers. - startFunctionRegistry.register(pTransformId, runner::startBundle); + switch (pTransform.getSpec().getUrn()) { + case PTransformTranslation.PAR_DO_TRANSFORM_URN: + case PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN: + case PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN: + startFunctionRegistry.register(pTransformId, runner::startBundle); + break; + case PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN: Review comment: We can't re-use the existing `startBundle`/`finishBundle` methods since it would be confusing to the user as to which context they are executing in (e.g. finishBundle can produce output) so this would require adding `startBundleForGetInitialRestriction`, `finishBundleForGetInitialRestriction`, `startBundleForPairWithRestriction` and `finishBundleForPairWithRestriction`. I could see value in this for per bundle object lifetime management but any such change should likely happen outside of the scope of this PR. Any reason not to use `setup`/`teardown` for your object cache? ########## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ########## @@ -784,9 +812,8 @@ private ByteString encodeProgress(double value) throws IOException { default: // no-op } - } - private void startBundle() { + // TODO: Support caching state data across bundle boundaries. Review comment: Done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org