This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 7bb19451dadea0259f6658c7ccc7f157fa0cd576 Author: Etienne Chauchot <echauc...@apache.org> AuthorDate: Tue Jan 15 17:06:51 2019 +0100 Remove bundleSize parameter and always use spark default parallelism --- .../spark/structuredstreaming/SparkPipelineOptions.java | 10 ---------- .../translation/batch/DatasetSourceBatch.java | 5 +---- 2 files changed, 1 insertion(+), 14 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineOptions.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineOptions.java index 2e6653b..442ccf8 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineOptions.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineOptions.java @@ -73,16 +73,6 @@ public interface SparkPipelineOptions void setCheckpointDurationMillis(Long durationMillis); - @Description( - "If set bundleSize will be used for splitting BoundedSources, otherwise default to " - + "splitting BoundedSources on Spark defaultParallelism. Most effective when used with " - + "Spark dynamicAllocation.") - @Default.Long(0) - Long getBundleSize(); - - @Experimental - void setBundleSize(Long value); - @Description("Enable/disable sending aggregator values to Spark's metric sinks") @Default.Boolean(true) Boolean getEnableSparkMetricSinks(); diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java index d966efb..3f6f219 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java @@ -113,10 +113,7 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport { List<InputPartition<InternalRow>> result = new ArrayList<>(); long desiredSizeBytes; try { - desiredSizeBytes = - (sparkPipelineOptions.getBundleSize() == null) - ? source.getEstimatedSizeBytes(sparkPipelineOptions) / numPartitions - : sparkPipelineOptions.getBundleSize(); + desiredSizeBytes = source.getEstimatedSizeBytes(sparkPipelineOptions) / numPartitions; List<? extends BoundedSource<T>> splits = source.split(desiredSizeBytes, sparkPipelineOptions); for (BoundedSource<T> split : splits) { result.add(