[ 
https://issues.apache.org/jira/browse/BEAM-7600?focusedWorklogId=306799&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306799
 ]

ASF GitHub Bot logged work on BEAM-7600:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Sep/19 00:44
            Start Date: 05/Sep/19 00:44
    Worklog Time Spent: 10m 
      Work Description: angoenka commented on pull request #9095: [BEAM-7600] 
borrow SDK harness management code into Spark runner
URL: https://github.com/apache/beam/pull/9095#discussion_r321029943
 
 

 ##########
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java
 ##########
 @@ -26,42 +26,46 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
-import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.sdk.function.ThrowingFunction;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * {@link FlinkExecutableStageContext.Factory} which counts 
FlinkExecutableStageContext reference
- * for book keeping.
+ * {@link ExecutableStageContext.Factory} which counts ExecutableStageContext 
reference for book
+ * keeping.
  */
-public class ReferenceCountingFlinkExecutableStageContextFactory
-    implements FlinkExecutableStageContext.Factory {
+public class ReferenceCountingExecutableStageContextFactory
+    implements ExecutableStageContext.Factory {
   private static final Logger LOG =
-      
LoggerFactory.getLogger(ReferenceCountingFlinkExecutableStageContextFactory.class);
+      
LoggerFactory.getLogger(ReferenceCountingExecutableStageContextFactory.class);
   private static final int MAX_RETRY = 3;
 
   private final Creator creator;
   private transient volatile ScheduledExecutorService executor;
   private transient volatile ConcurrentHashMap<String, WrappedContext> 
keyRegistry;
+  private final SerializableFunction<Object, Boolean> isReleaseSynchronous;
 
-  public static ReferenceCountingFlinkExecutableStageContextFactory 
create(Creator creator) {
-    return new ReferenceCountingFlinkExecutableStageContextFactory(creator);
+  public static ReferenceCountingExecutableStageContextFactory create(
+      Creator creator, SerializableFunction<Object, Boolean> 
isReleaseSynchronous) {
+    return new ReferenceCountingExecutableStageContextFactory(creator, 
isReleaseSynchronous);
   }
 
-  private ReferenceCountingFlinkExecutableStageContextFactory(Creator creator) 
{
+  private ReferenceCountingExecutableStageContextFactory(
+      Creator creator, SerializableFunction<Object, Boolean> 
isReleaseSynchronous) {
     this.creator = creator;
+    this.isReleaseSynchronous = isReleaseSynchronous;
   }
 
   @Override
-  public FlinkExecutableStageContext get(JobInfo jobInfo) {
+  public ExecutableStageContext get(
+      JobInfo jobInfo, SerializableFunction<Object, Boolean> 
isReleaseSynchronous) {
 
 Review comment:
   `isReleaseSynchronous` is not used
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 306799)
    Time Spent: 6h 10m  (was: 6h)

> Spark portable runner: reuse SDK harness
> ----------------------------------------
>
>                 Key: BEAM-7600
>                 URL: https://issues.apache.org/jira/browse/BEAM-7600
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-spark
>            Reporter: Kyle Weaver
>            Assignee: Kyle Weaver
>            Priority: Major
>              Labels: portability-spark
>          Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> Right now, we're creating a new SDK harness every time an executable stage is 
> run [1], which is expensive. We should be able to re-use code from the Flink 
> runner to re-use the SDK harness [2].
>  
> [1] 
> [https://github.com/apache/beam/blob/c9fb261bc7666788402840bb6ce1b0ce2fd445d1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java#L135]
> [2] 
> [https://github.com/apache/beam/blob/c9fb261bc7666788402840bb6ce1b0ce2fd445d1/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to