This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new d38f577624e Merge pull request #23795: Revert 23234: issue #23794 d38f577624e is described below commit d38f577624eaf8b5f4e31fec43ca8cfba132a132 Author: Reuven Lax <re...@google.com> AuthorDate: Sat Oct 22 00:09:20 2022 -0700 Merge pull request #23795: Revert 23234: issue #23794 --- .../apache/beam/sdk/options/ExecutorOptions.java | 59 ++++++++++++++++++++++ .../sdk/extensions/gcp/options/GcsOptions.java | 29 +++++------ 2 files changed, 71 insertions(+), 17 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java new file mode 100644 index 00000000000..2037d217422 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.options; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import java.util.concurrent.ScheduledExecutorService; +import org.apache.beam.sdk.util.UnboundedScheduledExecutorService; + +/** + * Options for configuring the {@link ScheduledExecutorService} used throughout the Java runtime. + */ +public interface ExecutorOptions extends PipelineOptions { + + /** + * The {@link ScheduledExecutorService} instance to use to create threads, can be overridden to + * specify a {@link ScheduledExecutorService} that is compatible with the user's environment. If + * unset, the default is to create an {@link UnboundedScheduledExecutorService}. + */ + @JsonIgnore + @Description( + "The ScheduledExecutorService instance to use to create threads, can be overridden to specify " + + "a ScheduledExecutorService that is compatible with the user's environment. If unset, " + + "the default is to create an UnboundedScheduledExecutorService.") + @Default.InstanceFactory(ScheduledExecutorServiceFactory.class) + @Hidden + ScheduledExecutorService getScheduledExecutorService(); + + void setScheduledExecutorService(ScheduledExecutorService value); + + /** Returns the default {@link ScheduledExecutorService} to use within the Apache Beam SDK. */ + class ScheduledExecutorServiceFactory implements DefaultValueFactory<ScheduledExecutorService> { + @Override + public ScheduledExecutorService create(PipelineOptions options) { + /* The SDK requires an unbounded thread pool because a step may create X writers + * each requiring their own thread to perform the writes otherwise a writer may + * block causing deadlock for the step because the writers buffer is full. + * Also, the MapTaskExecutor launches the steps in reverse order and completes + * them in forward order thus requiring enough threads so that each step's writers + * can be active. + */ + return new UnboundedScheduledExecutorService(); + } + } +} diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java index 0b14b244da5..fea7be7f5c7 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -29,10 +29,10 @@ import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.ExecutorOptions; import org.apache.beam.sdk.options.Hidden; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.InstanceBuilder; -import org.apache.beam.sdk.util.UnboundedScheduledExecutorService; import org.checkerframework.checker.nullness.qual.Nullable; /** Options used to configure Google Cloud Storage. */ @@ -48,20 +48,22 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline /** * The ExecutorService instance to use to create threads, can be overridden to specify an - * ExecutorService that is compatible with the user's environment. If unset, the default is to - * create an ExecutorService with an unbounded number of threads; this is compatible with Google - * AppEngine. + * ExecutorService that is compatible with the user's environment. If unset, the default is to use + * {@link ExecutorOptions#getScheduledExecutorService()}. + * + * @deprecated use {@link ExecutorOptions#getScheduledExecutorService()} instead */ @JsonIgnore - @Description( - "The ExecutorService instance to use to create multiple threads. Can be overridden " - + "to specify an ExecutorService that is compatible with the user's environment. If unset, " - + "the default is to create an ExecutorService with an unbounded number of threads; this " - + "is compatible with Google AppEngine.") @Default.InstanceFactory(ExecutorServiceFactory.class) @Hidden + @Deprecated ExecutorService getExecutorService(); + /** + * @deprecated use {@link ExecutorOptions#setScheduledExecutorService} instead. If set, it may + * result in multiple ExecutorServices, and therefore thread pools, in the runtime. + */ + @Deprecated void setExecutorService(ExecutorService value); /** GCS endpoint to use. If unspecified, uses the default endpoint. */ @@ -132,14 +134,7 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline class ExecutorServiceFactory implements DefaultValueFactory<ExecutorService> { @Override public ExecutorService create(PipelineOptions options) { - /* The SDK requires an unbounded thread pool because a step may create X writers - * each requiring their own thread to perform the writes otherwise a writer may - * block causing deadlock for the step because the writers buffer is full. - * Also, the MapTaskExecutor launches the steps in reverse order and completes - * them in forward order thus requiring enough threads so that each step's writers - * can be active. - */ - return new UnboundedScheduledExecutorService(); + return options.as(ExecutorOptions.class).getScheduledExecutorService(); } }