[ 
https://issues.apache.org/jira/browse/BEAM-3741?focusedWorklogId=164439&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-164439
 ]

ASF GitHub Bot logged work on BEAM-3741:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Nov/18 17:42
            Start Date: 09/Nov/18 17:42
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #6963: [BEAM-3741] Proto 
changes for reporting backlog/splitting/finalizing bundles.
URL: https://github.com/apache/beam/pull/6963
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto 
b/model/fn-execution/src/main/proto/beam_fn_api.proto
index 915686de6b3..39229b1fb52 100644
--- a/model/fn-execution/src/main/proto/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -119,6 +119,7 @@ message InstructionRequest {
     ProcessBundleRequest process_bundle = 1001;
     ProcessBundleProgressRequest process_bundle_progress = 1002;
     ProcessBundleSplitRequest process_bundle_split = 1003;
+    FinalizeBundleRequest finalize_bundle = 1004;
   }
 }
 
@@ -142,6 +143,7 @@ message InstructionResponse {
     ProcessBundleResponse process_bundle = 1001;
     ProcessBundleProgressResponse process_bundle_progress = 1002;
     ProcessBundleSplitResponse process_bundle_split = 1003;
+    FinalizeBundleResponse finalize_bundle = 1004;
   }
 }
 
@@ -184,55 +186,78 @@ message ProcessBundleDescriptor {
   org.apache.beam.model.pipeline.v1.ApiServiceDescriptor 
state_api_service_descriptor = 7;
 }
 
-// Represents a partition of the bundle into two bundles: a "primary" and
-// a "residual", with the following properties:
-// - The work in primary and residual doesn't overlap, and combined, adds up
-//   to the work in the current bundle if the split hadn't happened.
-// - The current bundle, if it keeps executing, will have done none of the
-//   work under residual roots.
-// - The current bundle, if no further splits happen, will have done exactly
-//   the work under primary_roots.
-// For more rigorous definitions see https://s.apache.org/beam-breaking-fusion
-message BundleSplit {
-  // One of the root applications specifying the scope of work for a bundle.
-  message Application {
-    // (Required) The primitive transform to which to pass the element
-    string ptransform_id = 1;
+// One of the applications specifying the scope of work for a bundle.
+// See 
https://docs.google.com/document/d/1tUDb45sStdR8u7-jBkGdw3OGFK7aa2-V7eo86zYSE_4/edit#heading=h.9g3g5weg2u9
 for further details.
+message BundleApplication {
+  // (Required) The primitive transform to which to pass the element
+  string ptransform_id = 1;
+
+  // (Required) Name of the transform's input to which to pass the element.
+  string input_id = 2;
 
-    // (Required) Name of the transform's input to which to pass the element.
-    string input_id = 2;
+  // (Required) The encoded element to pass to the transform.
+  bytes element = 3;
 
-    // (Required) The encoded element to pass to the transform.
-    bytes element = 3;
+  // The map is keyed by the local output name of the PTransform. Each
+  // value represents a lower bound on the timestamps of elements that
+  // are produced by this PTransform into each of its output PCollections
+  // when invoked with this application.
+  map<string, google.protobuf.Timestamp> output_watermarks = 4;
 
-    // Approximate lower bounds on timestamps of elements that this PTransform
-    // will produce into each of its output PCollections, when invoked on this
-    // element. Keyed by the transform's local output name.
-    map<string, int64> output_watermarks = 4;
+  // Represents an estimate for the amount of currently outstanding work.
+  message Backlog {
+    // This informs Runners on how to aggregate the backlog
+    // being reported across multiple active bundles. Backlogs
+    // are aggregated using the set of partitions.
+    //
+    // For example SplittableDoFn's which consume elements from:
+    //  * a globally shared resource such as a Pubsub queue should set this
+    //    to “”.
+    //  * a shared partitioned resource should use the partition identifier.
+    //  * a uniquely partitioned resource such as a file range should set this 
to
+    //    file name + start offset.
+    bytes partition = 1;
+
+    // The estimate for the backlog.
+    oneof value {
+      // Represents an estimate for the amount of outstanding work. Values
+      // compare lexicographically.
+      bytes bytes = 1000;
 
-    // Approximate fraction of all work of the current bundle (before split)
-    // represented by invoking this Application and its downstream 
applications.
-    // The sum of fraction_of_work between all primary_roots and residual_roots
-    // must add up to approximately 1.0.
-    google.protobuf.DoubleValue fraction_of_work = 5;
+      // Whether the backlog is unknown.
+      bool is_unknown = 1001;
+    }
   }
 
-  // An an Application should be scheduled after a delay.
-  message DelayedApplication {
-    // The delay in seconds (lower bound).
-    double delay_sec = 1;
+  // (Required) An estimate for the amount outstanding work related to
+  // this application.
+  Backlog backlog = 5;
 
-    // (Required) The application that should be scheduled.
-    Application application = 2;
-  }
+  // (Required) Whether this application potentially produces an unbounded
+  // amount of data. Note that this should only be set to BOUNDED if and
+  // only if the application is known to produce a finite amount of output.
+  //
+  // Note that this is different from the backlog as the backlog represents
+  // how much work there is currently outstanding.
+  org.apache.beam.model.pipeline.v1.IsBounded.Enum is_bounded = 6;
 
-  // Root applications that should replace the current bundle.
-  repeated Application primary_roots = 1;
+  // Contains additional monitoring information related to this application.
+  //
+  // Each application is able to report information that some runners
+  // will use consume when providing a UI or for making scaling and performance
+  // decisions. See https://s.apache.org/beam-bundles-backlog-splitting for 
+  // details about what types of signals may be useful to report.
+  repeated MonitoringInfo monitoring_infos = 7;
+}
 
-  // Root applications that have been removed from the current bundle and
-  // have to be executed in a separate bundle (e.g. in parallel on a different
-  // worker, or after the current bundle completes, etc.)
-  repeated DelayedApplication residual_roots = 2;
+// An Application should be scheduled for execution after a delay.
+message DelayedBundleApplication {
+  // Recommended time at which the application should be scheduled to execute
+  // by the runner. Times in the past may be scheduled to execute immediately.
+  google.protobuf.Timestamp requested_execution_time = 1;
+
+  // (Required) The application that should be scheduled.
+  BundleApplication application = 2;
 }
 
 // A request to process a given bundle.
@@ -247,20 +272,25 @@ message ProcessBundleRequest {
   repeated bytes cache_tokens = 2;
 }
 
-// Stable
 message ProcessBundleResponse {
   // (Optional) If metrics reporting is supported by the SDK, this represents
   // the final metrics to record for this bundle.
   // DEPRECATED
   Metrics metrics = 1;
 
-  // (Optional) Specifies that the bundle has been split since the last
-  // ProcessBundleProgressResponse was sent.
-  BundleSplit split = 2;
+  // (Optional) Specifies that the bundle has not been completed and the
+  // following applications need to be scheduled and executed in the future.
+  repeated DelayedBundleApplication residual_roots = 2;
 
   // (Required) The list of metrics or other MonitoredState
   // collected while processing this bundle.
   repeated MonitoringInfo monitoring_infos = 3;
+
+  // (Optional) Specifies that the runner must callback to this worker
+  // once the output of the bundle is committed. The Runner must send a
+  // FinalizeBundleRequest with the instruction id of the ProcessBundleRequest
+  // that is related to this ProcessBundleResponse.
+  bool requires_finalization = 4;
 }
 
 // A request to report progress information for a given bundle.
@@ -310,7 +340,7 @@ message MonitoringInfo {
   map<string, string> labels = 5;
 
   // The walltime of the most recent update.
-  // Useful for aggregation for Latest types such as LatestInt64.
+  // Useful for aggregation for latest types such as LatestInt64.
   google.protobuf.Timestamp timestamp = 6;
 }
 
@@ -573,36 +603,62 @@ message ProcessBundleProgressResponse {
   // DEPRECATED (Required)
   Metrics metrics = 1;
 
-  // (Optional) Specifies that the bundle has been split since the last
-  // ProcessBundleProgressResponse was sent.
-  BundleSplit split = 2;
-
   // (Required) The list of metrics or other MonitoredState
   // collected while processing this bundle.
   repeated MonitoringInfo monitoring_infos = 3;
+
+  // The list of currently active primary roots that are being
+  // executed. Required to be populated for PTransforms which can be split.
+  repeated BundleApplication primary_roots = 4;
 }
 
+// Represents a request to the SDK to split a currently active bundle.
 message ProcessBundleSplitRequest {
   // (Required) A reference to an active process bundle request with the given
   // instruction id.
   string instruction_reference = 1;
 
-  // Specifies that the runner would like the bundle to split itself using
-  // BundleSplit, and give up some of the work that the bundle hasn't started
-  // doing yet, so that it can be done in a separate bundle (perhaps in
-  // parallel with the current bundle).
+  // (Required) Specifies that the Runner would like the bundle to split itself
+  // such that it performs no more work than the backlog specified for each
+  // PTransform. The interpretation of how much work should be processed is up
+  // to the PTransform.
   //
-  // The value is the fraction of unstarted work to keep. E.g. 0 means give up
-  // as much as possible of unstarted work (e.g. checkpoint), 0.5 means give
-  // up about half of the unstarted work, etc.
-  // This is a hint and the value is approximate.
+  // For example, A backlog of "" tells the SDK to perform as little work as
+  // possible, effectively checkpointing when able. The remaining backlog
+  // will be relative to the backlog reported during processing.
   //
-  // The value is relative to the current scope of work of the bundle.
-  google.protobuf.DoubleValue fraction_of_remainder = 2;
+  // If the backlog is unspecified for a PTransform, the runner would like 
+  // the SDK to process all data received for that PTransform.
+  map<string, bytes> backlog_remaining = 2;
 }
 
+// Represents a partition of the bundle: a "primary" and
+// a "residual", with the following properties:
+// - The work in primary and residual doesn't overlap, and combined, adds up
+//   to the work in the current bundle if the split hadn't happened.
+// - The current bundle, if it keeps executing, will have done none of the
+//   work under residual_roots.
+// - The current bundle, if no further splits happen, will have done exactly
+//   the work under primary_roots.
+// For more rigorous definitions see https://s.apache.org/beam-breaking-fusion
 message ProcessBundleSplitResponse {
-  // Empty.
+  // Root applications that should replace the current bundle.
+  repeated BundleApplication primary_roots = 1;
+
+  // Root applications that have been removed from the current bundle and
+  // have to be executed in a separate bundle (e.g. in parallel on a different
+  // worker, or after the current bundle completes, etc.)
+  repeated DelayedBundleApplication residual_roots = 2;
+}
+
+message FinalizeBundleRequest {
+  // (Required) A reference to a completed process bundle request with the 
given
+  // instruction id.
+  string instruction_reference = 1;
+}
+
+message FinalizeBundleResponse {
+  // Empty
 }
 
 /*
diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto 
b/model/pipeline/src/main/proto/beam_runner_api.proto
index 194797f166c..3fba61bedc4 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -446,6 +446,9 @@ message ParDoPayload {
 
   // (Required if splittable == true) Id of the restriction coder.
   string restriction_coder_id = 7;
+
+  // (Optional) Only set when this ParDo can request bundle finalization.
+  bool requests_finalization = 8;
 }
 
 // Parameters that a UDF might require.
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
index 7089576adf2..fd42f2f93c5 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
@@ -76,7 +76,7 @@ private RemoteStageEvaluator(PTransformNode transform) throws 
Exception {
               BundleFactoryOutputReceiverFactory.create(
                   bundleFactory, stage.getComponents(), outputs::add),
               StateRequestHandler.unsupported(),
-              BundleProgressHandler.unsupported());
+              BundleProgressHandler.ignored());
       // TODO(BEAM-4680): Add support for timers as inputs to the ULR
       this.mainInput = 
Iterables.getOnlyElement(bundle.getInputReceivers().values());
     }
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.java
index 88fcae1c98a..2993d23db5b 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.java
@@ -130,20 +130,13 @@ private SplittableRemoteStageEvaluator(
                   BundleFactoryOutputReceiverFactory.create(
                       bundleFactory, stage.getComponents(), outputs::add),
                   StateRequestHandler.unsupported(),
+                  // TODO: Wire in splitting via a split listener
                   new BundleProgressHandler() {
                     @Override
-                    public void onProgress(ProcessBundleProgressResponse 
progress) {
-                      if (progress.hasSplit()) {
-                        feeder.split(progress.getSplit());
-                      }
-                    }
+                    public void onProgress(ProcessBundleProgressResponse 
progress) {}
 
                     @Override
-                    public void onCompleted(ProcessBundleResponse response) {
-                      if (response.hasSplit()) {
-                        feeder.split(response.getSplit());
-                      }
-                    }
+                    public void onCompleted(ProcessBundleResponse response) {}
                   });
       this.mainInput = 
Iterables.getOnlyElement(bundle.getInputReceivers().values());
     }
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
index 9b5e76244a8..fe6dd9ceeef 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
@@ -57,6 +57,7 @@
 import org.apache.beam.sdk.values.TupleTagList;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -176,6 +177,7 @@ public void splitRange(
   }
 
   @Test
+  @Ignore("TODO: BEAM-3743")
   public void testSDF() throws Exception {
     Pipeline p = Pipeline.create();
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index 51acb84034b..17b7e53aef8 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -125,7 +125,7 @@ public void open(Configuration parameters) throws Exception 
{
     stateRequestHandler =
         getStateRequestHandler(
             executableStage, stageBundleFactory.getProcessBundleDescriptor(), 
runtimeContext);
-    progressHandler = BundleProgressHandler.unsupported();
+    progressHandler = BundleProgressHandler.ignored();
   }
 
   private StateRequestHandler getStateRequestHandler(
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 5df08459a80..eb6461113ef 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -151,7 +151,7 @@ public void open() throws Exception {
 
     stageBundleFactory = stageContext.getStageBundleFactory(executableStage);
     stateRequestHandler = getStateRequestHandler(executableStage);
-    progressHandler = BundleProgressHandler.unsupported();
+    progressHandler = BundleProgressHandler.ignored();
     outputQueue = new LinkedBlockingQueue<>();
   }
 
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleProgressHandler.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleProgressHandler.java
index bc7ca38096e..5846bdfaeef 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleProgressHandler.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleProgressHandler.java
@@ -33,22 +33,14 @@
   /** Handles the bundle's completion report. */
   void onCompleted(ProcessBundleResponse response);
 
-  /** Returns a handler that ignores metrics and throws on splits (as splits 
can not be ignored). */
-  static BundleProgressHandler unsupported() {
+  /** Returns a handler that ignores metrics. */
+  static BundleProgressHandler ignored() {
     return new BundleProgressHandler() {
       @Override
-      public void onProgress(ProcessBundleProgressResponse progress) {
-        if (progress.hasSplit()) {
-          throw new UnsupportedOperationException("Splitting not yet 
supported");
-        }
-      }
+      public void onProgress(ProcessBundleProgressResponse progress) {}
 
       @Override
-      public void onCompleted(ProcessBundleResponse response) {
-        if (response.hasSplit()) {
-          throw new UnsupportedOperationException("Splitting not yet 
supported");
-        }
-      }
+      public void onCompleted(ProcessBundleResponse response) {}
     };
   }
 }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/splittabledofn/SDFFeederViaStateAndTimers.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/splittabledofn/SDFFeederViaStateAndTimers.java
index d1d67a0367f..deef29ad496 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/splittabledofn/SDFFeederViaStateAndTimers.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/splittabledofn/SDFFeederViaStateAndTimers.java
@@ -23,8 +23,8 @@
 import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.util.List;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleSplit;
-import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleSplit.DelayedApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaces;
@@ -45,7 +45,7 @@
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.ByteString;
-import org.joda.time.Duration;
+import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.util.Timestamps;
 import org.joda.time.Instant;
 
 /**
@@ -76,7 +76,8 @@
   private WatermarkHoldState holdState;
 
   private Instant inputTimestamp;
-  private BundleSplit split;
+  private List<BundleApplication> primaryRoots;
+  private List<DelayedBundleApplication> residualRoots;
 
   /** Initializes the feeder. */
   public SDFFeederViaStateAndTimers(
@@ -119,7 +120,7 @@ public void seed(WindowedValue<KV<InputT, RestrictionT>> 
elementRestriction) {
    * and sets a wake-up timer if a checkpoint happened.
    */
   public void commit() throws IOException {
-    if (split == null) {
+    if (primaryRoots == null) {
       // No split - the call terminated.
       seedState.clear();
       restrictionState.clear();
@@ -128,9 +129,8 @@ public void commit() throws IOException {
     }
 
     // For now can only happen on the first instruction which is 
SPLITTABLE_PROCESS_ELEMENTS.
-    List<DelayedApplication> residuals = split.getResidualRootsList();
-    checkArgument(residuals.size() == 1, "More than 1 residual is unsupported 
for now");
-    DelayedApplication residual = residuals.get(0);
+    checkArgument(residualRoots.size() == 1, "More than 1 residual is 
unsupported for now");
+    DelayedBundleApplication residual = residualRoots.get(0);
 
     ByteString encodedResidual = residual.getApplication().getElement();
     WindowedValue<KV<InputT, RestrictionT>> decodedResidual =
@@ -151,8 +151,12 @@ public void commit() throws IOException {
         inputTimestamp);
     holdState.add(watermarkHold);
 
-    Duration resumeDelay = Duration.millis((long) (1000L * 
residual.getDelaySec()));
-    Instant wakeupTime = 
timerInternals.currentProcessingTime().plus(resumeDelay);
+    Instant requestedWakeupTime =
+        new Instant(Timestamps.toMillis(residual.getRequestedExecutionTime()));
+    Instant wakeupTime =
+        timerInternals.currentProcessingTime().isBefore(requestedWakeupTime)
+            ? requestedWakeupTime
+            : timerInternals.currentProcessingTime();
 
     // Set a timer to continue processing this element.
     timerInternals.setTimer(
@@ -160,13 +164,18 @@ public void commit() throws IOException {
   }
 
   /** Signals that a split happened. */
-  public void split(BundleSplit split) {
+  public void split(
+      List<BundleApplication> primaryRoots, List<DelayedBundleApplication> 
residualRoots) {
     checkState(
-        this.split == null,
-        "At most 1 split supported, however got new split %s in addition to 
existing %s",
-        split,
-        this.split);
-    this.split = split;
+        this.primaryRoots == null,
+        "At most 1 split supported, however got new split (%s, %s) "
+            + "in addition to existing (%s, %s)",
+        primaryRoots,
+        residualRoots,
+        this.primaryRoots,
+        this.residualRoots);
+    this.primaryRoots = primaryRoots;
+    this.residualRoots = residualRoots;
   }
 
   private void initState(StateNamespace ns) {
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 3bbe04e5a64..69a604fb033 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -263,7 +263,7 @@ public void process(ProcessContext ctxt) {
     // The impulse example
 
     try (ActiveBundle bundle =
-        processor.newBundle(outputReceivers, 
BundleProgressHandler.unsupported())) {
+        processor.newBundle(outputReceivers, BundleProgressHandler.ignored())) 
{
       Iterables.getOnlyElement(bundle.getInputReceivers().values())
           .accept(WindowedValue.valueInGlobalWindow(new byte[0]));
     }
@@ -374,7 +374,7 @@ public void processElement(ProcessContext context) {
                 };
               }
             });
-    BundleProgressHandler progressHandler = 
BundleProgressHandler.unsupported();
+    BundleProgressHandler progressHandler = BundleProgressHandler.ignored();
 
     try (ActiveBundle bundle =
         processor.newBundle(outputReceivers, stateRequestHandler, 
progressHandler)) {
@@ -528,7 +528,7 @@ public void clear(K key, W window) {
 
     try (ActiveBundle bundle =
         processor.newBundle(
-            outputReceivers, stateRequestHandler, 
BundleProgressHandler.unsupported())) {
+            outputReceivers, stateRequestHandler, 
BundleProgressHandler.ignored())) {
       Iterables.getOnlyElement(bundle.getInputReceivers().values())
           .accept(WindowedValue.valueInGlobalWindow(kvBytes("X", "Y")));
     }
@@ -671,9 +671,7 @@ public void processingTimer(
 
     try (ActiveBundle bundle =
         processor.newBundle(
-            outputReceivers,
-            StateRequestHandler.unsupported(),
-            BundleProgressHandler.unsupported())) {
+            outputReceivers, StateRequestHandler.unsupported(), 
BundleProgressHandler.ignored())) {
       bundle
           .getInputReceivers()
           .get(stage.getInputPCollection().getId())
@@ -794,7 +792,7 @@ public void process(ProcessContext c) {
           processor.newBundle(
               outputReceivers,
               StateRequestHandler.unsupported(),
-              BundleProgressHandler.unsupported())) {
+              BundleProgressHandler.ignored())) {
         bundle
             .getInputReceivers()
             .get(stage.getInputPCollection().getId())
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
index 82b734cdbdc..f875b78659b 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
@@ -236,7 +236,7 @@ public void testNewBundleNoDataDoesNotCrash() throws 
Exception {
     when(dataService.send(any(), 
eq(coder))).thenReturn(mock(CloseableFnDataReceiver.class));
 
     try (ActiveBundle activeBundle =
-        processor.newBundle(Collections.emptyMap(), 
BundleProgressHandler.unsupported())) {
+        processor.newBundle(Collections.emptyMap(), 
BundleProgressHandler.ignored())) {
       // Correlating the ProcessBundleRequest and ProcessBundleResponse is 
owned by the underlying
       // FnApiControlClient. The SdkHarnessClient owns just wrapping the 
request and unwrapping
       // the response.
@@ -271,7 +271,7 @@ public void testNewBundleAndProcessElements() throws 
Exception {
                     FullWindowedValueCoder.of(
                         LengthPrefixCoder.of(StringUtf8Coder.of()), 
Coder.INSTANCE),
                     outputs::add)),
-            BundleProgressHandler.unsupported())) {
+            BundleProgressHandler.ignored())) {
       FnDataReceiver<WindowedValue<?>> bundleInputReceiver =
           Iterables.getOnlyElement(activeBundle.getInputReceivers().values());
       bundleInputReceiver.accept(WindowedValue.valueInGlobalWindow("foo"));
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
index 6ade4bbe234..92a966fd6a1 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
@@ -30,8 +30,8 @@
 import java.util.concurrent.ScheduledExecutorService;
 import org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.Context;
 import org.apache.beam.fn.harness.state.FnApiStateAccessor;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleSplit.Application;
-import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleSplit.DelayedApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication;
 import 
org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.SplittableProcessElementInvoker;
@@ -53,6 +53,7 @@
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.util.Timestamps;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
@@ -227,14 +228,14 @@ public void outputWindowedValue(
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
-      Application primaryApplication =
-          Application.newBuilder()
+      BundleApplication primaryApplication =
+          BundleApplication.newBuilder()
               .setPtransformId(context.ptransformId)
               .setInputId(mainInputId)
               .setElement(primaryBytes.toByteString())
               .build();
-      Application residualApplication =
-          Application.newBuilder()
+      BundleApplication residualApplication =
+          BundleApplication.newBuilder()
               .setPtransformId(context.ptransformId)
               .setInputId(mainInputId)
               .setElement(residualBytes.toByteString())
@@ -242,9 +243,12 @@ public void outputWindowedValue(
       context.splitListener.split(
           ImmutableList.of(primaryApplication),
           ImmutableList.of(
-              DelayedApplication.newBuilder()
+              DelayedBundleApplication.newBuilder()
                   .setApplication(residualApplication)
-                  .setDelaySec(0.001 * 
result.getContinuation().resumeDelay().getMillis())
+                  .setRequestedExecutionTime(
+                      Timestamps.fromMillis(
+                          System.currentTimeMillis()
+                              + 
result.getContinuation().resumeDelay().getMillis()))
                   .build()));
     }
   }
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BundleSplitListener.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BundleSplitListener.java
index 5e6ba701e2f..9eab2459ead 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BundleSplitListener.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BundleSplitListener.java
@@ -18,8 +18,8 @@
 package org.apache.beam.fn.harness.control;
 
 import java.util.List;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleSplit.Application;
-import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleSplit.DelayedApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication;
 
 /**
  * Listens to splits happening to a single bundle. See <a
@@ -36,5 +36,5 @@
    * are a decomposition of work that has been given away by the bundle, so 
the runner must delegate
    * it for someone else to execute.
    */
-  void split(List<Application> primaryRoots, List<DelayedApplication> 
residualRoots);
+  void split(List<BundleApplication> primaryRoots, 
List<DelayedBundleApplication> residualRoots);
 }
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 9b9ed6c249e..547a85932ee 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -44,9 +44,8 @@
 import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleSplit;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleSplit.Application;
-import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleSplit.DelayedApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse;
@@ -236,19 +235,19 @@ private void 
createRunnerAndConsumersForPTransformRecursively(
                 beamFnStateGrpcClientCache.forApiServiceDescriptor(
                     bundleDescriptor.getStateApiServiceDescriptor()))
             : new FailAllStateCallsForBundle(request.getProcessBundle())) {
-      Multimap<String, Application> allPrimaries = ArrayListMultimap.create();
-      Multimap<String, DelayedApplication> allResiduals = 
ArrayListMultimap.create();
+      Multimap<String, BundleApplication> allPrimaries = 
ArrayListMultimap.create();
+      Multimap<String, DelayedBundleApplication> allResiduals = 
ArrayListMultimap.create();
       BundleSplitListener splitListener =
-          (List<Application> primaries, List<DelayedApplication> residuals) -> 
{
+          (List<BundleApplication> primaries, List<DelayedBundleApplication> 
residuals) -> {
             // Reset primaries and accumulate residuals.
-            Multimap<String, Application> newPrimaries = 
ArrayListMultimap.create();
-            for (Application primary : primaries) {
+            Multimap<String, BundleApplication> newPrimaries = 
ArrayListMultimap.create();
+            for (BundleApplication primary : primaries) {
               newPrimaries.put(primary.getPtransformId(), primary);
             }
             allPrimaries.clear();
             allPrimaries.putAll(newPrimaries);
 
-            for (DelayedApplication residual : residuals) {
+            for (DelayedBundleApplication residual : residuals) {
               allResiduals.put(residual.getApplication().getPtransformId(), 
residual);
             }
           };
@@ -290,12 +289,8 @@ private void 
createRunnerAndConsumersForPTransformRecursively(
         LOG.debug("Finishing function {}", finishFunction);
         finishFunction.run();
       }
-      if (!allPrimaries.isEmpty()) {
-        response.setSplit(
-            BundleSplit.newBuilder()
-                .addAllPrimaryRoots(allPrimaries.values())
-                .addAllResidualRoots(allResiduals.values())
-                .build());
+      if (!allResiduals.isEmpty()) {
+        response.addAllResidualRoots(allResiduals.values());
       }
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 164439)
    Time Spent: 8h 50m  (was: 8h 40m)

> Proto changes for splitting over Fn API
> ---------------------------------------
>
>                 Key: BEAM-3741
>                 URL: https://issues.apache.org/jira/browse/BEAM-3741
>             Project: Beam
>          Issue Type: Sub-task
>          Components: beam-model
>            Reporter: Eugene Kirpichov
>            Assignee: Eugene Kirpichov
>            Priority: Major
>             Fix For: 2.5.0
>
>          Time Spent: 8h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to