xinyuiscool commented on code in PR #23313:
URL: https://github.com/apache/beam/pull/23313#discussion_r980578000
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptionsValidator.java:
##########
@@ -44,14 +44,11 @@ static void
validateBundlingRelatedOptions(SamzaPipelineOptions pipelineOptions)
: pipelineOptions.getConfigOverride();
final JobConfig jobConfig = new JobConfig(new MapConfig(configs));
- // TODO: once Samza supports a better thread pool modle, e.g. thread
- // per-task/key-range, this can be supported.
+ // Validate that the threadPoolSize is not override in the code
checkArgument(
jobConfig.getThreadPoolSize() <= 1,
JOB_CONTAINER_THREAD_POOL_SIZE
- + " cannot be configured to"
- + " greater than 1 for max bundle size: "
- + pipelineOptions.getMaxBundleSize());
+ + " should be replaced with
SamzaPipelineOptions.bundleThreadNum");
Review Comment:
Yes, in beam's context, it's parallelism within a bundle. I changed the
message as well as the description in the pipeline option to reflect that.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java:
##########
@@ -479,15 +481,15 @@ static <T, OutT> CompletionStage<WindowedValue<OutT>>
createOutputFuture(
}
static class FutureCollectorImpl<OutT> implements FutureCollector<OutT> {
- private final List<CompletionStage<WindowedValue<OutT>>> outputFutures;
private final AtomicBoolean collectorSealed;
+ private CompletionStage<Collection<WindowedValue<OutT>>> outputFuture;
Review Comment:
Good catch! I added synchronization there.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]