[beam] branch master updated: [BEAM-7041] Let the user control if he wants to wrap the provided DataSource as a poolable one or not
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new bc9aa73 [BEAM-7041] Let the user control if he wants to wrap the provided DataSource as a poolable one or not new 702df1b Merge pull request #8257: [BEAM-7041] Let the user control if he wants to wrap the provided DataSource as a poolable one or not bc9aa73 is described below commit bc9aa730009909d9c632fce669bff5ce25d9d81a Author: Jean-Baptiste Onofré AuthorDate: Tue Apr 9 17:15:21 2019 +0200 [BEAM-7041] Let the user control if he wants to wrap the provided DataSource as a poolable one or not --- .../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java | 45 ++ .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java| 14 +++ 2 files changed, 44 insertions(+), 15 deletions(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java index e6f2699..8c824a8 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java @@ -242,6 +242,8 @@ public class JdbcIO { @Nullable abstract DataSource getDataSource(); +abstract boolean isPoolingDataSource(); + abstract Builder builder(); @AutoValue.Builder @@ -258,14 +260,22 @@ public class JdbcIO { abstract Builder setDataSource(DataSource dataSource); + abstract Builder setPoolingDataSource(boolean poolingDataSource); + abstract DataSourceConfiguration build(); } public static DataSourceConfiguration create(DataSource dataSource) { + return create(dataSource, true); +} + +public static DataSourceConfiguration create( +DataSource dataSource, boolean isPoolingDataSource) { checkArgument(dataSource != null, "dataSource can not be null"); checkArgument(dataSource instanceof Serializable, "dataSource must be Serializable"); return new AutoValue_JdbcIO_DataSourceConfiguration.Builder() .setDataSource(dataSource) + .setPoolingDataSource(isPoolingDataSource) .build(); } @@ -284,6 +294,7 @@ public class JdbcIO { return new AutoValue_JdbcIO_DataSourceConfiguration.Builder() .setDriverClassName(driverClassName) .setUrl(url) + .setPoolingDataSource(true) .build(); } @@ -356,21 +367,25 @@ public class JdbcIO { current = basicDataSource; } - // wrapping the datasource as a pooling datasource - DataSourceConnectionFactory connectionFactory = new DataSourceConnectionFactory(current); - PoolableConnectionFactory poolableConnectionFactory = - new PoolableConnectionFactory(connectionFactory, null); - GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); - poolConfig.setMaxTotal(1); - poolConfig.setMinIdle(0); - poolConfig.setMinEvictableIdleTimeMillis(1); - poolConfig.setSoftMinEvictableIdleTimeMillis(3); - GenericObjectPool connectionPool = - new GenericObjectPool(poolableConnectionFactory, poolConfig); - poolableConnectionFactory.setPool(connectionPool); - poolableConnectionFactory.setDefaultAutoCommit(false); - poolableConnectionFactory.setDefaultReadOnly(false); - return new PoolingDataSource(connectionPool); + if (isPoolingDataSource()) { +// wrapping the datasource as a pooling datasource +DataSourceConnectionFactory connectionFactory = new DataSourceConnectionFactory(current); +PoolableConnectionFactory poolableConnectionFactory = +new PoolableConnectionFactory(connectionFactory, null); +GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); +poolConfig.setMaxTotal(1); +poolConfig.setMinIdle(0); +poolConfig.setMinEvictableIdleTimeMillis(1); +poolConfig.setSoftMinEvictableIdleTimeMillis(3); +GenericObjectPool connectionPool = +new GenericObjectPool(poolableConnectionFactory, poolConfig); +poolableConnectionFactory.setPool(connectionPool); +poolableConnectionFactory.setDefaultAutoCommit(false); +poolableConnectionFactory.setDefaultReadOnly(false); +return new PoolingDataSource(connectionPool); + } else { +return current; + } } } diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java index 0e9127a..3e45363 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java @@ -45,6 +45,7 @@ i
[beam] branch master updated: [BEAM-6935] Spark portable runner: implement side inputs
This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new c45a50f [BEAM-6935] Spark portable runner: implement side inputs new 16b58ad Merge pull request #8220: [BEAM-6935] Spark portable runner: implement side inputs c45a50f is described below commit c45a50fb092171ce4fa5f8b0758a584911d4f50d Author: Kyle Weaver AuthorDate: Thu Mar 28 19:16:51 2019 -0700 [BEAM-6935] Spark portable runner: implement side inputs --- .../functions/FlinkExecutableStageFunction.java| 4 +- .../translation/BatchSideInputHandlerFactory.java} | 35 ++--- .../BatchSideInputHandlerFactoryTest.java} | 40 +++ .../runners/spark/translation/BoundedDataset.java | 9 .../SparkBatchPortablePipelineTranslator.java | 47 +++-- .../translation/SparkExecutableStageFunction.java | 59 +++--- .../runners/spark/SparkPortableExecutionTest.java | 36 + .../SparkExecutableStageFunctionTest.java | 15 +++--- 8 files changed, 181 insertions(+), 64 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java index e7dafa8..c02aa65 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java @@ -54,6 +54,7 @@ import org.apache.beam.runners.fnexecution.control.StageBundleFactory; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.fnexecution.state.StateRequestHandler; import org.apache.beam.runners.fnexecution.state.StateRequestHandlers; +import org.apache.beam.runners.fnexecution.translation.BatchSideInputHandlerFactory; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.fn.data.FnDataReceiver; import org.apache.beam.sdk.io.FileSystems; @@ -167,7 +168,8 @@ public class FlinkExecutableStageFunction extends AbstractRichFunction RuntimeContext runtimeContext) { final StateRequestHandler sideInputHandler; StateRequestHandlers.SideInputHandlerFactory sideInputHandlerFactory = -FlinkBatchSideInputHandlerFactory.forStage(executableStage, runtimeContext); +BatchSideInputHandlerFactory.forStage( +executableStage, runtimeContext::getBroadcastVariable); try { sideInputHandler = StateRequestHandlers.forSideInputHandlerFactory( diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.java similarity index 87% rename from runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactory.java rename to runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.java index 798c32b..5460898 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.flink.translation.functions; +package org.apache.beam.runners.fnexecution.translation; import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; @@ -43,24 +43,25 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMultimap; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap; -import org.apache.flink.api.common.functions.RuntimeContext; -/** - * {@link StateRequestHandler} that uses a Flink {@link RuntimeContext} to access Flink broadcast - * variable that represent side inputs. - */ -class FlinkBatchSideInputHandlerFactory implements SideInputHandlerFactory { +/** {@link StateRequestHandler} that uses a {@link SideInputGetter} to access side inputs. */ +public class BatchSideInputHandlerFactory implements SideInputHandlerFactory { // Map from side input id to global PCollection id. private final Map sideInputToCollection; - private final RuntimeContext runtimeContext; + private final SideInputGetter sideInputGetter;
[beam] branch master updated: [BEAM-6935] Spark portable runner: implement side inputs
This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new c45a50f [BEAM-6935] Spark portable runner: implement side inputs new 16b58ad Merge pull request #8220: [BEAM-6935] Spark portable runner: implement side inputs c45a50f is described below commit c45a50fb092171ce4fa5f8b0758a584911d4f50d Author: Kyle Weaver AuthorDate: Thu Mar 28 19:16:51 2019 -0700 [BEAM-6935] Spark portable runner: implement side inputs --- .../functions/FlinkExecutableStageFunction.java| 4 +- .../translation/BatchSideInputHandlerFactory.java} | 35 ++--- .../BatchSideInputHandlerFactoryTest.java} | 40 +++ .../runners/spark/translation/BoundedDataset.java | 9 .../SparkBatchPortablePipelineTranslator.java | 47 +++-- .../translation/SparkExecutableStageFunction.java | 59 +++--- .../runners/spark/SparkPortableExecutionTest.java | 36 + .../SparkExecutableStageFunctionTest.java | 15 +++--- 8 files changed, 181 insertions(+), 64 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java index e7dafa8..c02aa65 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java @@ -54,6 +54,7 @@ import org.apache.beam.runners.fnexecution.control.StageBundleFactory; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.fnexecution.state.StateRequestHandler; import org.apache.beam.runners.fnexecution.state.StateRequestHandlers; +import org.apache.beam.runners.fnexecution.translation.BatchSideInputHandlerFactory; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.fn.data.FnDataReceiver; import org.apache.beam.sdk.io.FileSystems; @@ -167,7 +168,8 @@ public class FlinkExecutableStageFunction extends AbstractRichFunction RuntimeContext runtimeContext) { final StateRequestHandler sideInputHandler; StateRequestHandlers.SideInputHandlerFactory sideInputHandlerFactory = -FlinkBatchSideInputHandlerFactory.forStage(executableStage, runtimeContext); +BatchSideInputHandlerFactory.forStage( +executableStage, runtimeContext::getBroadcastVariable); try { sideInputHandler = StateRequestHandlers.forSideInputHandlerFactory( diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.java similarity index 87% rename from runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactory.java rename to runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.java index 798c32b..5460898 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.flink.translation.functions; +package org.apache.beam.runners.fnexecution.translation; import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; @@ -43,24 +43,25 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMultimap; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap; -import org.apache.flink.api.common.functions.RuntimeContext; -/** - * {@link StateRequestHandler} that uses a Flink {@link RuntimeContext} to access Flink broadcast - * variable that represent side inputs. - */ -class FlinkBatchSideInputHandlerFactory implements SideInputHandlerFactory { +/** {@link StateRequestHandler} that uses a {@link SideInputGetter} to access side inputs. */ +public class BatchSideInputHandlerFactory implements SideInputHandlerFactory { // Map from side input id to global PCollection id. private final Map sideInputToCollection; - private final RuntimeContext runtimeContext; + private final SideInputGetter sideInputGetter;
[beam] branch master updated: Update the spec for extract_output and make it clear the input accumulator is mutable
This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 0100149 Update the spec for extract_output and make it clear the input accumulator is mutable new cc903a4 Merge pull request #8264 from robinyqiu/typo 0100149 is described below commit 0100149c44bf07c9ac8dc93fde44289d97d2ede5 Author: Yueyang Qiu AuthorDate: Tue Apr 9 15:03:41 2019 -0700 Update the spec for extract_output and make it clear the input accumulator is mutable --- .../core/src/main/java/org/apache/beam/sdk/transforms/Combine.java | 2 ++ sdks/python/apache_beam/transforms/core.py | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 54e6f08..7444062 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -359,6 +359,8 @@ public class Combine { /** * Returns the output value that is the result of combining all the input values represented by * the given accumulator. + * + * @param accumulator can be modified for efficiency */ public abstract OutputT extractOutput(AccumT accumulator); diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index a445152..a159abd 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -667,7 +667,8 @@ class CombineFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn): Args: accumulator: the final accumulator value computed by this CombineFn -for the entire input key or PCollection. +for the entire input key or PCollection. Can be modified for +efficiency. *args: Additional arguments and side inputs. **kwargs: Additional arguments and side inputs. """
[beam] branch master updated: Update dataflow worker container version (#8275)
This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 9893e2c Update dataflow worker container version (#8275) 9893e2c is described below commit 9893e2c52ee8a0c1e83de043af628c1a09b39892 Author: Boyuan Zhang <36090911+boyua...@users.noreply.github.com> AuthorDate: Thu Apr 11 09:39:06 2019 -0700 Update dataflow worker container version (#8275) --- runners/google-cloud-dataflow-java/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index c9cefa5..558813c 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -39,7 +39,7 @@ processResources { filter org.apache.tools.ant.filters.ReplaceTokens, tokens: [ 'dataflow.legacy_environment_major_version' : '7', 'dataflow.fnapi_environment_major_version' : '7', -'dataflow.container_version' : 'beam-master-20190308' +'dataflow.container_version' : 'beam-master-20190410' ] }
[beam] branch master updated: Refactoring code from direct runner, and adding unit test for processing time timers. (#8271)
This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new cbe4dfb Refactoring code from direct runner, and adding unit test for processing time timers. (#8271) cbe4dfb is described below commit cbe4dfbdbe5d0da5152568853ee5e17334dd1b54 Author: Pablo AuthorDate: Thu Apr 11 11:35:25 2019 -0700 Refactoring code from direct runner, and adding unit test for processing time timers. (#8271) * Small refactor of direct runner code, and adding unit test. * Fixing lint issue --- sdks/python/apache_beam/runners/common.py | 8 +-- .../apache_beam/runners/direct/direct_runner.py| 11 ++-- .../runners/direct/evaluation_context.py | 28 ++ .../apache_beam/transforms/userstate_test.py | 59 ++ 4 files changed, 85 insertions(+), 21 deletions(-) diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index f1fda35..84ac116 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -547,7 +547,7 @@ class PerWindowInvoker(DoFnInvoker): try: self.current_windowed_value = windowed_value self.restriction_tracker = restriction_tracker -return self._invoke_per_window( +return self._invoke_process_per_window( windowed_value, additional_args, additional_kwargs, output_processor) finally: @@ -556,14 +556,14 @@ class PerWindowInvoker(DoFnInvoker): elif self.has_windowed_inputs and len(windowed_value.windows) != 1: for w in windowed_value.windows: -self._invoke_per_window( +self._invoke_process_per_window( WindowedValue(windowed_value.value, windowed_value.timestamp, (w,)), additional_args, additional_kwargs, output_processor) else: - self._invoke_per_window( + self._invoke_process_per_window( windowed_value, additional_args, additional_kwargs, output_processor) - def _invoke_per_window( + def _invoke_process_per_window( self, windowed_value, additional_args, additional_kwargs, output_processor): if self.has_windowed_inputs: diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 43e8c7f..e880460 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -69,11 +69,6 @@ class SwitchingDirectRunner(PipelineRunner): """ def run_pipeline(self, pipeline, options): -use_fnapi_runner = True - -# Streaming mode is not yet supported on the FnApiRunner. -if options.view_as(StandardOptions).streaming: - use_fnapi_runner = False from apache_beam.pipeline import PipelineVisitor from apache_beam.runners.dataflow.native_io.iobase import NativeSource @@ -113,8 +108,10 @@ class SwitchingDirectRunner(PipelineRunner): self.supported_by_fnapi_runner = False # Check whether all transforms used in the pipeline are supported by the -# FnApiRunner. -use_fnapi_runner = _FnApiRunnerSupportVisitor().accept(pipeline) +# FnApiRunner, and the pipeline was not meant to be run as streaming. +use_fnapi_runner = ( +_FnApiRunnerSupportVisitor().accept(pipeline) +and not options.view_as(StandardOptions).streaming) # Also ensure grpc is available. try: diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py index 24b05b6..a042ded 100644 --- a/sdks/python/apache_beam/runners/direct/evaluation_context.py +++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py @@ -274,16 +274,7 @@ class EvaluationContext(object): result.logical_metric_updates) # If the result is for a view, update side inputs container. - if (result.uncommitted_output_bundles - and result.uncommitted_output_bundles[0].pcollection - in self._pcollection_to_views): -for view in self._pcollection_to_views[ -result.uncommitted_output_bundles[0].pcollection]: - for committed_bundle in committed_bundles: -# side_input must be materialized. -self._side_inputs_container.add_values( -view, -committed_bundle.get_elements_iterable(make_copy=True)) + self._update_side_inputs_container(committed_bundles, result) # Tasks generated from unblocked side inputs as the watermark progresses. tasks = self._watermark_manager.update_watermarks( @@ -304,6 +295,23 @@ class EvaluationContext(object): existing_keyed_state[k] = v return committed_bundles + def _upd
[beam] branch master updated: [BEAM-4374] Add Beam Distribution Accumulator to use in python's counter factory.
This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new f876166 [BEAM-4374] Add Beam Distribution Accumulator to use in python's counter factory. new 51a13f0 Merge pull request #8272 from ajamato/mean_byte_count_cy_combiner_only f876166 is described below commit f876166ba02a94ea66954a586810f1f15d36d98e Author: Alex Amato AuthorDate: Wed Mar 13 17:56:17 2019 -0700 [BEAM-4374] Add Beam Distribution Accumulator to use in python's counter factory. --- .../runners/dataflow/internal/apiclient.py | 21 --- .../runners/dataflow/internal/apiclient_test.py| 44 +- .../python/apache_beam/transforms/cy_combiners.pxd | 10 + sdks/python/apache_beam/transforms/cy_combiners.py | 36 ++ sdks/python/apache_beam/utils/counters.py | 1 + 5 files changed, 105 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index e0a1a56..b0b1325 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -829,8 +829,8 @@ def translate_distribution(distribution_update, metric_update_proto): """Translate metrics DistributionUpdate to dataflow distribution update. Args: -distribution_update: Instance of DistributionData or -DataflowDistributionCounter. +distribution_update: Instance of DistributionData, +DistributionInt64Accumulator or DataflowDistributionCounter. metric_update_proto: Used for report metrics. """ dist_update_proto = dataflow.DistributionUpdate() @@ -838,7 +838,7 @@ def translate_distribution(distribution_update, metric_update_proto): dist_update_proto.max = to_split_int(distribution_update.max) dist_update_proto.count = to_split_int(distribution_update.count) dist_update_proto.sum = to_split_int(distribution_update.sum) - # DatadflowDistributionCounter needs to translate histogram + # DataflowDistributionCounter needs to translate histogram if isinstance(distribution_update, DataflowDistributionCounter): dist_update_proto.histogram = dataflow.Histogram() distribution_update.translate_to_histogram(dist_update_proto.histogram) @@ -969,6 +969,11 @@ def _verify_interpreter_version_is_supported(pipeline_options): # To enable a counter on the service, add it to this dictionary. +# This is required for the legacy python dataflow runner, as portability +# does not communicate to the service via python code, but instead via a +# a runner harness (in C++ or Java). +# TODO(BEAM-7050) : Remove this antipattern, legacy dataflow python +# pipelines will break whenever a new cy_combiner type is used. structured_counter_translations = { cy_combiners.CountCombineFn: ( dataflow.CounterMetadata.KindValueValuesEnum.SUM, @@ -1005,7 +1010,10 @@ structured_counter_translations = { MetricUpdateTranslators.translate_boolean), cy_combiners.DataflowDistributionCounterFn: ( dataflow.CounterMetadata.KindValueValuesEnum.DISTRIBUTION, -translate_distribution) +translate_distribution), +cy_combiners.DistributionInt64Fn: ( +dataflow.CounterMetadata.KindValueValuesEnum.DISTRIBUTION, +translate_distribution), } @@ -1045,5 +1053,8 @@ counter_translations = { MetricUpdateTranslators.translate_boolean), cy_combiners.DataflowDistributionCounterFn: ( dataflow.NameAndKind.KindValueValuesEnum.DISTRIBUTION, -translate_distribution) +translate_distribution), +cy_combiners.DistributionInt64Fn: ( +dataflow.CounterMetadata.KindValueValuesEnum.DISTRIBUTION, +translate_distribution), } diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index 77eba7c..2f65716 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -163,7 +163,24 @@ class UtilTest(unittest.TestCase): self.assertEqual((split_number.lowBits, split_number.highBits), (0, number)) - def test_translate_distribution(self): + def test_translate_distribution_using_accumulator(self): +metric_update = dataflow.CounterUpdate() +accumulator = mock.Mock() +accumulator.min = 1 +accumulator.max = 15 +accumulator.sum = 16 +accumulator.count = 2 +apiclient.translate_distribution(accumulator, metric_update) +self.assertEqual(metric_update.distribution.min.lowBits, + accumulator.min) +self.assertEqual(metric_update.distribution.max.lowBits, +
[beam] branch master updated (51a13f0 -> c1841e2)
This is an automated email from the ASF dual-hosted git repository. goenka pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 51a13f0 Merge pull request #8272 from ajamato/mean_byte_count_cy_combiner_only new 9d4bab5 [BEAM-6677] Pulling job server in beam init action new 0419113 [BEAM-6677] Quickfix: Prevent not finding nodes during Flink configuration on Dataproc new a5da15f [BEAM-6677] Add GCLOUD_ZONE and DATAPROC_VERSION for easier customization new 1943416 [BEAM-6677] Add JobServer support to create_flink_cluster.sh new c1841e2 Merge pull request #8250 from lgajowy/BEAM-6677_flink-job-server-dataproc The 20941 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .test-infra/dataproc/create_flink_cluster.sh | 45 ++-- .test-infra/dataproc/init-actions/beam.sh| 34 ++--- .test-infra/dataproc/init-actions/flink.sh | 9 +++--- 3 files changed, 58 insertions(+), 30 deletions(-)
[beam] branch spark-runner_structured-streaming updated (3533c89 -> 573cf0c)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. from 3533c89 Add source streaming test new 6b84c2f Specify checkpointLocation at the pipeline start new dbe88d4 Clean unneeded 0 arg constructor in batch source new 5e584ce Remove unneeded 0 arg constructor in batch source new 980a922 Clean streaming source new 4562a89 Fllow up on offsets for streaming source new 573cf0c Deal with checkpoint and offset based read The 19459 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../structuredstreaming/SparkPipelineOptions.java | 12 --- .../translation/TranslationContext.java| 14 ++- .../translation/batch/DatasetSourceBatch.java | 2 - .../streaming/DatasetSourceStreaming.java | 111 + .../streaming/ReadSourceTranslatorStreaming.java | 22 +--- 5 files changed, 105 insertions(+), 56 deletions(-)
[beam] branch master updated: [BEAM-7053] prevent errors in Spark options
This is an automated email from the ASF dual-hosted git repository. goenka pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new d0f1968 [BEAM-7053] prevent errors in Spark options new 8334eb5 Merge pull request #8283 from ibzib/options-exception d0f1968 is described below commit d0f19682e62d090fe96175805198f198ed313391 Author: Kyle Weaver AuthorDate: Wed Apr 10 14:22:25 2019 -0700 [BEAM-7053] prevent errors in Spark options --- .../main/java/org/apache/beam/runners/spark/SparkJobInvoker.java | 8 1 file changed, 8 insertions(+) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobInvoker.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobInvoker.java index cea4b07..e47c851 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobInvoker.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobInvoker.java @@ -56,6 +56,14 @@ public class SparkJobInvoker extends JobInvoker { String.format("%s_%s", sparkOptions.getJobName(), UUID.randomUUID().toString()); LOG.info("Invoking job {}", invocationId); +// Options can't be translated to proto if runner class is unresolvable, so set it to null. +sparkOptions.setRunner(null); + +if (sparkOptions.getAppName() == null) { + LOG.debug("App name was null. Using invocationId {}", invocationId); + sparkOptions.setAppName(invocationId); +} + return createJobInvocation( invocationId, retrievalToken, executorService, pipeline, sparkOptions); }
[beam] branch master updated (8334eb5 -> e4cf63b)
This is an automated email from the ASF dual-hosted git repository. markliu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 8334eb5 Merge pull request #8283 from ibzib/options-exception new 9cb9175 Add Python 3.6 and 3.7 test suites new 92af51a Skip tests failing on Python 3.7 new e9d50a0 Deactivate Python 3.6 and 3.7 cython test suites. new e4cf63b Merge pull request #8036 from RobbeSneyders/python-3.6-3.7 The 20947 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: build.gradle | 1 + .../apache_beam/transforms/ptransform_test.py | 4 ++ .../typehints/native_type_compatibility_test.py| 6 +++ .../apache_beam/typehints/typed_pipeline_test.py | 10 .../python/apache_beam/typehints/typehints_test.py | 6 +++ sdks/python/test-suites/tox/py36/build.gradle | 15 +- .../test-suites/tox/{py35 => py37}/build.gradle| 36 ++ sdks/python/tox.ini| 56 +- settings.gradle| 2 + 9 files changed, 113 insertions(+), 23 deletions(-) copy sdks/python/test-suites/tox/{py35 => py37}/build.gradle (63%)
[beam] branch dot-log created (now c33dd9e)
This is an automated email from the ASF dual-hosted git repository. jaku pushed a change to branch dot-log in repository https://gitbox.apache.org/repos/asf/beam.git. at c33dd9e Add the appropriate extension to log files. This branch includes the following new commits: new c33dd9e Add the appropriate extension to log files. The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[beam] 01/01: Add the appropriate extension to log files.
This is an automated email from the ASF dual-hosted git repository. jaku pushed a commit to branch dot-log in repository https://gitbox.apache.org/repos/asf/beam.git commit c33dd9eae2185014cf4bd6f5bc7ecd6405378ad7 Author: jasonkuster AuthorDate: Thu Apr 11 17:12:46 2019 -0700 Add the appropriate extension to log files. Files are created with the filename. extension which is not a valid filetype and can cause difficulties for people when trying to open logs. Set it to add an appropriate extension instead. --- .../runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java index 7428390..0fdb4ae 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java @@ -273,7 +273,7 @@ public class DataflowWorkerLoggingHandler extends Handler { @Override public OutputStream get() { try { -String filename = filepath + "." + formatter.format(new Date()); +String filename = filepath + "." + formatter.format(new Date()) + ".log"; return new BufferedOutputStream( new FileOutputStream(new File(filename), true /* append */)); } catch (IOException e) {
[beam] branch master updated: [BEAM-7046] Restore os.environ in HttpClientTest
This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 34a4e37 [BEAM-7046] Restore os.environ in HttpClientTest new e93cdf1 Merge pull request #8266 from udim/http_proxy 34a4e37 is described below commit 34a4e37f10eab0d1d83188739cdf7baf58d2922c Author: Udi Meiri AuthorDate: Tue Apr 9 15:57:39 2019 -0700 [BEAM-7046] Restore os.environ in HttpClientTest This was causing warnings about parsing http_proxy in other tests that use grpc. --- .../apache_beam/internal/http_client_test.py | 87 +++--- 1 file changed, 44 insertions(+), 43 deletions(-) diff --git a/sdks/python/apache_beam/internal/http_client_test.py b/sdks/python/apache_beam/internal/http_client_test.py index 460b1db..9755edf 100644 --- a/sdks/python/apache_beam/internal/http_client_test.py +++ b/sdks/python/apache_beam/internal/http_client_test.py @@ -21,6 +21,7 @@ from __future__ import absolute_import import os import unittest +import mock from httplib2 import ProxyInfo from apache_beam.internal.http_client import DEFAULT_HTTP_TIMEOUT_SECONDS @@ -31,52 +32,52 @@ from apache_beam.internal.http_client import proxy_info_from_environment_var class HttpClientTest(unittest.TestCase): def test_proxy_from_env_http_with_port(self): -os.environ['http_proxy'] = 'http://localhost:9000' -proxy_info = proxy_info_from_environment_var('http_proxy') -expected = ProxyInfo(3, 'localhost', 9000) -self.assertEquals(str(expected), str(proxy_info)) +with mock.patch.dict(os.environ, http_proxy='http://localhost:9000'): + proxy_info = proxy_info_from_environment_var('http_proxy') + expected = ProxyInfo(3, 'localhost', 9000) + self.assertEquals(str(expected), str(proxy_info)) def test_proxy_from_env_https_with_port(self): -os.environ['https_proxy'] = 'https://localhost:9000' -proxy_info = proxy_info_from_environment_var('https_proxy') -expected = ProxyInfo(3, 'localhost', 9000) -self.assertEquals(str(expected), str(proxy_info)) +with mock.patch.dict(os.environ, https_proxy='https://localhost:9000'): + proxy_info = proxy_info_from_environment_var('https_proxy') + expected = ProxyInfo(3, 'localhost', 9000) + self.assertEquals(str(expected), str(proxy_info)) def test_proxy_from_env_http_without_port(self): -os.environ['http_proxy'] = 'http://localhost' -proxy_info = proxy_info_from_environment_var('http_proxy') -expected = ProxyInfo(3, 'localhost', 80) -self.assertEquals(str(expected), str(proxy_info)) +with mock.patch.dict(os.environ, http_proxy='http://localhost'): + proxy_info = proxy_info_from_environment_var('http_proxy') + expected = ProxyInfo(3, 'localhost', 80) + self.assertEquals(str(expected), str(proxy_info)) def test_proxy_from_env_https_without_port(self): -os.environ['https_proxy'] = 'https://localhost' -proxy_info = proxy_info_from_environment_var('https_proxy') -expected = ProxyInfo(3, 'localhost', 443) -self.assertEquals(str(expected), str(proxy_info)) +with mock.patch.dict(os.environ, https_proxy='https://localhost'): + proxy_info = proxy_info_from_environment_var('https_proxy') + expected = ProxyInfo(3, 'localhost', 443) + self.assertEquals(str(expected), str(proxy_info)) def test_proxy_from_env_http_without_method(self): -os.environ['http_proxy'] = 'localhost:8000' -proxy_info = proxy_info_from_environment_var('http_proxy') -expected = ProxyInfo(3, 'localhost', 8000) -self.assertEquals(str(expected), str(proxy_info)) +with mock.patch.dict(os.environ, http_proxy='localhost:8000'): + proxy_info = proxy_info_from_environment_var('http_proxy') + expected = ProxyInfo(3, 'localhost', 8000) + self.assertEquals(str(expected), str(proxy_info)) def test_proxy_from_env_https_without_method(self): -os.environ['https_proxy'] = 'localhost:8000' -proxy_info = proxy_info_from_environment_var('https_proxy') -expected = ProxyInfo(3, 'localhost', 8000) -self.assertEquals(str(expected), str(proxy_info)) +with mock.patch.dict(os.environ, https_proxy='localhost:8000'): + proxy_info = proxy_info_from_environment_var('https_proxy') + expected = ProxyInfo(3, 'localhost', 8000) + self.assertEquals(str(expected), str(proxy_info)) def test_proxy_from_env_http_without_port_without_method(self): -os.environ['http_proxy'] = 'localhost' -proxy_info = proxy_info_from_environment_var('http_proxy') -expected = ProxyInfo(3, 'localhost', 80) -self.assertEquals(str(expected), str(proxy_info)) +with mock.patch.dict(os.environ, http_proxy='localhost'): + proxy_info = proxy_info_from_environment_var('http_proxy') + expected = ProxyInfo(3, 'localhost', 80) + self.assertEqu
[beam] branch master updated: [BEAM-7059] SamzaRunner: fix the job.id inconsistency in the new Samza version (#8279)
This is an automated email from the ASF dual-hosted git repository. xinyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 540327e [BEAM-7059] SamzaRunner: fix the job.id inconsistency in the new Samza version (#8279) 540327e is described below commit 540327eab201ede710681cd07de8f8105a506730 Author: xinyuiscool AuthorDate: Thu Apr 11 18:29:32 2019 -0700 [BEAM-7059] SamzaRunner: fix the job.id inconsistency in the new Samza version (#8279) --- .../beam/runners/samza/SamzaExecutionContext.java | 33 -- .../org/apache/beam/runners/samza/SamzaRunner.java | 5 ++-- .../samza/runtime/SamzaTimerInternalsFactory.java | 10 ++- .../runners/samza/translation/ConfigBuilder.java | 2 ++ 4 files changed, 38 insertions(+), 12 deletions(-) diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java index af65135..0867e51 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java @@ -59,6 +59,7 @@ public class SamzaExecutionContext implements ApplicationContainerContext { private GrpcFnServer fnDataServer; private GrpcFnServer fnStateServer; private ControlClientPool controlClientPool; + private ExecutorService dataExecutor; private IdGenerator idGenerator = IdGenerators.incrementingLongs(); public SamzaExecutionContext(SamzaPipelineOptions options) { @@ -92,7 +93,7 @@ public class SamzaExecutionContext implements ApplicationContainerContext { if (SamzaRunnerOverrideConfigs.isPortableMode(options)) { try { controlClientPool = MapControlClientPool.create(); -final ExecutorService dataExecutor = Executors.newCachedThreadPool(); +dataExecutor = Executors.newCachedThreadPool(); fnControlServer = GrpcFnServer.allocatePortAndCreateFor( @@ -100,18 +101,23 @@ public class SamzaExecutionContext implements ApplicationContainerContext { controlClientPool.getSink(), () -> SAMZA_WORKER_ID), ServerFactory.createWithPortSupplier( () -> SamzaRunnerOverrideConfigs.getFnControlPort(options))); +LOG.info("Started control server on port {}", fnControlServer.getServer().getPort()); fnDataServer = GrpcFnServer.allocatePortAndCreateFor( GrpcDataService.create(dataExecutor, OutboundObserverFactory.serverDirect()), ServerFactory.createDefault()); +LOG.info("Started data server on port {}", fnDataServer.getServer().getPort()); fnStateServer = GrpcFnServer.allocatePortAndCreateFor( GrpcStateService.create(), ServerFactory.createDefault()); +LOG.info("Started state server on port {}", fnStateServer.getServer().getPort()); final long waitTimeoutMs = SamzaRunnerOverrideConfigs.getControlClientWaitTimeoutMs(options); +LOG.info("Control client wait timeout config: " + waitTimeoutMs); + final InstructionRequestHandler instructionHandler = controlClientPool.getSource().take(SAMZA_WORKER_ID, Duration.ofMillis(waitTimeoutMs)); final EnvironmentFactory environmentFactory = @@ -120,6 +126,7 @@ public class SamzaExecutionContext implements ApplicationContainerContext { jobBundleFactory = SingleEnvironmentInstanceJobBundleFactory.create( environmentFactory, fnDataServer, fnStateServer, idGenerator); +LOG.info("Started job bundle factory"); } catch (Exception e) { throw new RuntimeException( "Running samza in Beam portable mode but failed to create job bundle factory", e); @@ -131,19 +138,29 @@ public class SamzaExecutionContext implements ApplicationContainerContext { @Override public void stop() { -closeFnServer(fnControlServer); +closeAutoClosable(fnControlServer, "controlServer"); fnControlServer = null; -closeFnServer(fnDataServer); +closeAutoClosable(fnDataServer, "dataServer"); fnDataServer = null; -closeFnServer(fnStateServer); +closeAutoClosable(fnStateServer, "stateServer"); fnStateServer = null; +if (dataExecutor != null) { + dataExecutor.shutdown(); + dataExecutor = null; +} +controlClientPool = null; +closeAutoClosable(jobBundleFactory, "jobBundle"); +jobBundleFactory = null; } - private void closeFnServer(GrpcFnServer fnServer) { -try (AutoCloseable closer = fnServer) { - // do nothing + private static void closeAutoClosable(AutoCloseable closeable, String name) { +try (AutoCloseable closer = closeable) { + LOG.info("Closed {