mynameborat commented on code in PR #23313:
URL: https://github.com/apache/beam/pull/23313#discussion_r980326677


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptionsValidator.java:
##########
@@ -44,14 +44,11 @@ static void 
validateBundlingRelatedOptions(SamzaPipelineOptions pipelineOptions)
               : pipelineOptions.getConfigOverride();
       final JobConfig jobConfig = new JobConfig(new MapConfig(configs));
 
-      // TODO: once Samza supports a better thread pool modle, e.g. thread
-      // per-task/key-range, this can be supported.
+      // Validate that the threadPoolSize is not override in the code
       checkArgument(
           jobConfig.getThreadPoolSize() <= 1,
           JOB_CONTAINER_THREAD_POOL_SIZE
-              + " cannot be configured to"
-              + " greater than 1 for max bundle size: "
-              + pipelineOptions.getMaxBundleSize());
+              + " should be replaced with 
SamzaPipelineOptions.bundleThreadNum");

Review Comment:
   Just to clarify, we don't need parallelism across tasks with the assumption 
of non-blocking processElement calls within `OpAdapter`? 
   
   This configuration is the increase parallelism within a task inside the 
processElement call? Is that right?



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java:
##########
@@ -129,4 +134,30 @@ public interface SamzaPipelineOptions extends 
PipelineOptions {
   long getMaxBundleTimeMs();
 
   void setMaxBundleTimeMs(long maxBundleTimeMs);
+
+  @Description(
+      "The number of threads to run DoFn.processElements in parallel. Used 
only in non-portable mode.")

Review Comment:
   Do we want to support this in portable mode? Or will this be only applicable 
to classic (non-portable) mode? 



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java:
##########
@@ -188,7 +176,31 @@ public void emitWatermark(Instant watermark) {
 
     @Override
     public <T> void emitView(String id, WindowedValue<Iterable<T>> elements) {
-      outputList.add(OpMessage.ofSideInput(id, elements));
+      outputQueue.add(OpMessage.ofSideInput(id, elements));
+    }
+
+    @Override
+    public Collection<OpMessage<OutT>> collectOutput() {
+      final List<OpMessage<OutT>> outputList = new ArrayList<>();
+      OpMessage<OutT> output;
+      while ((output = outputQueue.poll()) != null) {
+        outputList.add(output);
+      }
+      return outputList;
+    }
+
+    @Override
+    public CompletionStage<Collection<OpMessage<OutT>>> collectFuture() {
+      final CompletionStage<Collection<OpMessage<OutT>>> future = outputFuture;
+      outputFuture = null;

Review Comment:
   this needs to be synchronized across `emitFuture(...)`?



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java:
##########
@@ -129,4 +134,30 @@ public interface SamzaPipelineOptions extends 
PipelineOptions {
   long getMaxBundleTimeMs();
 
   void setMaxBundleTimeMs(long maxBundleTimeMs);
+
+  @Description(
+      "The number of threads to run DoFn.processElements in parallel. Used 
only in non-portable mode.")
+  @Default.Integer(0)

Review Comment:
   Should this default be 1 instead?



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java:
##########
@@ -479,15 +481,15 @@ static <T, OutT> CompletionStage<WindowedValue<OutT>> 
createOutputFuture(
   }
 
   static class FutureCollectorImpl<OutT> implements FutureCollector<OutT> {
-    private final List<CompletionStage<WindowedValue<OutT>>> outputFutures;
     private final AtomicBoolean collectorSealed;
+    private CompletionStage<Collection<WindowedValue<OutT>>> outputFuture;

Review Comment:
   output future needs to be thread safe given we have moved away from 
synchronized list.
   Especially across `add(..)` and `addAll(..)`



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java:
##########
@@ -496,13 +498,24 @@ public void add(CompletionStage<WindowedValue<OutT>> 
element) {
       checkState(
           !collectorSealed.get(),
           "Cannot add elements to an unprepared collector. Make sure prepare() 
is invoked before adding elements.");
-      outputFutures.add(element);
+      outputFuture =
+          outputFuture.thenCombine(
+              element,
+              (collection, event) -> {
+                collection.add(event);
+                return collection;
+              });
+    }
+
+    @Override
+    public void addAll(CompletionStage<Collection<WindowedValue<OutT>>> 
elements) {
+      outputFuture = FutureUtils.combineFutures(outputFuture, elements);

Review Comment:
   Can we add the state check to ensure `addAll(...)` isn't invoked when the 
collector is sealed?



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java:
##########
@@ -84,8 +84,6 @@ public final void schedule(Scheduler<KeyedTimerData<K>> 
timerRegistry) {
 
   @Override
   public synchronized CompletionStage<Collection<OpMessage<OutT>>> 
apply(OpMessage<InT> message) {

Review Comment:
   Do we still need the `synchronized` keyword across the methods given we have 
removed the shared state and push the responsibility to emitter?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to