scwhittle commented on code in PR #32272:
URL: https://github.com/apache/beam/pull/32272#discussion_r1725640273
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java:
##########
@@ -131,6 +136,11 @@ public DoFnRunner<KeyedWorkItem<byte[], KV<InputT,
RestrictionT>>, OutputT> crea
OutputManager outputManager,
DoFnSchemaInformation doFnSchemaInformation,
Map<String, PCollectionView<?>> sideInputMapping) {
+ if (this.ses == null) {
Review Comment:
if this factory possibly called concurrently? might need some
synchronization if so?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java:
##########
@@ -131,6 +136,11 @@ public DoFnRunner<KeyedWorkItem<byte[], KV<InputT,
RestrictionT>>, OutputT> crea
OutputManager outputManager,
DoFnSchemaInformation doFnSchemaInformation,
Map<String, PCollectionView<?>> sideInputMapping) {
+ if (this.ses == null) {
+ this.ses =
+ Executors.newSingleThreadScheduledExecutor(
Review Comment:
I don't think we want a single threaded one here, because I believe the
factory vends many different dofnrunner which will want some parallel threads
for splits.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java:
##########
@@ -137,6 +141,12 @@ public void open(
isBounded,
pipelineOptions);
+ if (this.ses == null) {
+ this.ses =
+ Executors.newSingleThreadScheduledExecutor(
Review Comment:
not yet using this below
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]