[ 
https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94358&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94358
 ]

ASF GitHub Bot logged work on BEAM-3327:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Apr/18 23:03
            Start Date: 23/Apr/18 23:03
    Worklog Time Spent: 10m 
      Work Description: jkff commented on a change in pull request #5189: 
[BEAM-3327] Basic Docker environment factory
URL: https://github.com/apache/beam/pull/5189#discussion_r183562828
 
 

 ##########
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ControlClientPool.java
 ##########
 @@ -17,16 +17,58 @@
  */
 package org.apache.beam.runners.fnexecution.control;
 
-import org.apache.beam.sdk.fn.function.ThrowingConsumer;
-import org.apache.beam.sdk.util.ThrowingSupplier;
+import javax.annotation.concurrent.ThreadSafe;
 
-/** Control client pool that exposes a source and sink of control clients. */
-public interface ControlClientPool<T extends InstructionRequestHandler> {
+/**
+ * A pool of control clients that brokers incoming SDK harness connections (in 
the form of {@link
+ * InstructionRequestHandler InstructionRequestHandlers}.
+ *
+ * <p>Incoming instruction handlers usually come from the control plane gRPC 
service. Typical use:
+ *
+ * <pre>
+ *   // Within owner of the pool, who may or may not own the control plane 
server as well
+ *   ControlClientPool pool = ...
+ *   FnApiControlClientPoolService service =
+ *       FnApiControlClientPoolService.offeringClientsToSink(pool.getSink(), 
headerAccessor)
+ *   // Incoming gRPC control connections will now be added to the client pool.
+ *
+ *   // Within code that interacts with the instruction handler. The get call 
blocks until an
+ *   // incoming client is available:
+ *   ControlClientSource clientSource = ... InstructionRequestHandler
+ *   instructionHandler = clientSource.get("worker-id");
+ * </pre>
+ *
+ * <p>All {@link ControlClientPool} must be thread-safe.
+ */
+@ThreadSafe
+public interface ControlClientPool {
+
+  /** Sink for control clients. */
+  Sink getSink();
 
   /** Source of control clients. */
-  ThrowingSupplier<T> getSource();
+  Source getSource();
 
-  /** Sink for control clients. */
-  ThrowingConsumer<T> getSink();
+  /** A sink for {@link InstructionRequestHandler InstructionRequestHandlers} 
keyed by worker id. */
+  @FunctionalInterface
+  interface Sink {
+
+    /**
+     * Puts an {@link InstructionRequestHandler} into a client pool. Worker 
ids must be unique per
+     * pool.
+     */
+    void put(String workerId, InstructionRequestHandler instructionHandler) 
throws Exception;
+  }
+
+  /** A source of {@link InstructionRequestHandler 
InstructionRequestHandlers}. */
+  @FunctionalInterface
+  interface Source {
 
+    /**
+     * Retrieves the {@link InstructionRequestHandler} for the given worker 
id, blocking until
+     * available. Worker ids must be unique per pool. A given worker id must 
not be requested
 
 Review comment:
   <!--new_thread; commit:d5f16c195fc483161fb1bfb4020edf633708d3b7; 
resolved:0-->
   This gives the impression that if the worker never becomes available, the 
call never returns. Is this the case?
   
   Also, minor note: this semantics (single get() and put() per workerId) makes 
me think that perhaps "offer/remove" would be better terminology than "get/put" 
- or, at least, "remove" would be better than "get".

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 94358)
    Time Spent: 20h  (was: 19h 50m)

> Add abstractions to manage Environment Instance lifecycles.
> -----------------------------------------------------------
>
>                 Key: BEAM-3327
>                 URL: https://issues.apache.org/jira/browse/BEAM-3327
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-core
>            Reporter: Thomas Groh
>            Assignee: Ben Sidhom
>            Priority: Major
>              Labels: portability
>          Time Spent: 20h
>  Remaining Estimate: 0h
>
> This permits remote stage execution for arbitrary environments



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to