[ 
https://issues.apache.org/jira/browse/BEAM-3104?focusedWorklogId=85452&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85452
 ]

ASF GitHub Bot logged work on BEAM-3104:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Mar/18 00:41
            Start Date: 29/Mar/18 00:41
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on a change in pull request #4973: 
[BEAM-3104] Set up state interfaces, wire into SDK harness client.
URL: https://github.com/apache/beam/pull/4973#discussion_r177928358
 
 

 ##########
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
 ##########
 @@ -273,58 +324,120 @@ public SdkHarnessClient withIdGenerator(IdGenerator 
idGenerator) {
     return new SdkHarnessClient(fnApiControlClient, fnApiDataService, 
idGenerator);
   }
 
+  /**
+   * Provides {@link BundleProcessor} that is capable of processing bundles not
+   * containing any state accesses such as:
+   * <ul>
+   *   <li>Side inputs</li>
+   *   <li>User state</li>
+   *   <li>Remote references</li>
+   * </ul>
+   *
+   * <p>Note that bundle processors are cached based upon the the
+   * {@link ProcessBundleDescriptor#getId() process bundle descriptor id}.
+   * A previously created instance may be returned.
+   */
   public <T> BundleProcessor<T> getProcessor(
       BeamFnApi.ProcessBundleDescriptor descriptor,
       RemoteInputDestination<WindowedValue<T>> remoteInputDesination) {
-    try {
-      return clientProcessors.get(
-          descriptor.getId(),
-          () ->
-              (BundleProcessor)
-                  register(
-                          Collections.singletonMap(
-                              descriptor, (RemoteInputDestination) 
remoteInputDesination))
-                      .get(descriptor.getId()));
-    } catch (ExecutionException e) {
-      throw new RuntimeException(e);
+    checkState(!descriptor.hasStateApiServiceDescriptor(),
+        "The %s cannot support a %s containing a state %s.",
+        BundleProcessor.class.getSimpleName(),
+        BeamFnApi.ProcessBundleDescriptor.class.getSimpleName(),
+        Endpoints.ApiServiceDescriptor.class.getSimpleName());
+    return getProcessor(descriptor, remoteInputDesination, 
NoOpStateDelegator.INSTANCE);
+  }
+
+  /**
+   * A {@link StateDelegator} that issues zero state requests to any provided
+   * {@link StateRequestHandler state handlers}.
+   */
+  private static class NoOpStateDelegator implements StateDelegator {
+    private static final NoOpStateDelegator INSTANCE = new 
NoOpStateDelegator();
+    @Override
+    public Registration registerForProcessBundleInstructionId(String 
processBundleInstructionId,
+        StateRequestHandler handler) {
+      return Registration.INSTANCE;
+    }
+
+    /**
+     * The corresponding registration for a {@link NoOpStateDelegator} that 
does nothing.
+     */
+    private static class Registration implements StateDelegator.Registration {
+      private static final Registration INSTANCE = new Registration();
+
+      @Override
+      public void deregister() {
+      }
+
+      @Override
+      public void abort() {
+      }
     }
   }
 
   /**
-   * Registers a {@link BeamFnApi.ProcessBundleDescriptor} for future 
processing.
+   * Provides {@link BundleProcessor} that is capable of processing bundles 
containing
+   * state accesses such as:
+   * <ul>
+   *   <li>Side inputs</li>
+   *   <li>User state</li>
+   *   <li>Remote references</li>
+   * </ul>
    *
-   * <p>A client may block on the result future, but may also proceed without 
blocking.
+   * <p>Note that bundle processors are cached based upon the the
+   * {@link ProcessBundleDescriptor#getId() process bundle descriptor id}.
+   * A previously created instance may be returned.
    */
-  public Map<String, BundleProcessor> register(
-      Map<BeamFnApi.ProcessBundleDescriptor, 
RemoteInputDestination<WindowedValue<?>>>
-          processBundleDescriptors) {
+  public <T> BundleProcessor<T> getProcessor(
+      BeamFnApi.ProcessBundleDescriptor descriptor,
+      RemoteInputDestination<WindowedValue<T>> remoteInputDesination,
+      StateDelegator stateDelegator) {
+    BundleProcessor<T> bundleProcessor = clientProcessors.computeIfAbsent(
 
 Review comment:
   Done and one other place.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 85452)
    Time Spent: 1h 10m  (was: 1h)

> Implement a BeamFnState Service
> -------------------------------
>
>                 Key: BEAM-3104
>                 URL: https://issues.apache.org/jira/browse/BEAM-3104
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-core
>            Reporter: Thomas Groh
>            Priority: Major
>              Labels: portability
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> This needs to use java methods to handle State Requests, and convert the java 
> response into an appropriate State API response.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to