This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit ca88d547d54ec9e1f5831894106dce076205acbd Author: Etienne Chauchot <echauc...@apache.org> AuthorDate: Tue Jan 15 16:42:09 2019 +0100 Cleaning --- .../spark/structuredstreaming/SparkRunner.java | 4 +- .../translation/PipelineTranslator.java | 4 +- .../translation/batch/DatasetSourceBatch.java | 2 +- .../translation/batch/PipelineTranslatorBatch.java | 5 +- .../translation/batch/TranslationContextBatch.java | 40 ------- .../batch/mocks/DatasetSourceMockBatch.java | 94 --------------- .../batch/mocks/ReadSourceTranslatorMockBatch.java | 62 ---------- .../translation/batch/mocks/package-info.java | 20 ---- .../streaming/DatasetSourceStreaming.java | 133 +++------------------ ...lator.java => PipelineTranslatorStreaming.java} | 6 +- .../streaming/StreamingTranslationContext.java | 29 ----- 11 files changed, 27 insertions(+), 372 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java index 97aa4d8..934c6d2 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java @@ -22,7 +22,7 @@ import static org.apache.beam.runners.core.construction.PipelineResources.detect import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.runners.spark.structuredstreaming.translation.batch.PipelineTranslatorBatch; -import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.StreamingPipelineTranslator; +import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.PipelineTranslatorStreaming; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.options.PipelineOptions; @@ -124,7 +124,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { PipelineTranslator.prepareFilesToStageForRemoteClusterExecution(options); PipelineTranslator pipelineTranslator = options.isStreaming() - ? new StreamingPipelineTranslator(options) + ? new PipelineTranslatorStreaming(options) : new PipelineTranslatorBatch(options); pipelineTranslator.translate(pipeline); return pipelineTranslator.getTranslationContext(); diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java index e0924e3..7fbbfe6 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java @@ -21,7 +21,7 @@ import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.PipelineResources; import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.translation.batch.PipelineTranslatorBatch; -import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.StreamingPipelineTranslator; +import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.PipelineTranslatorStreaming; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.PTransform; @@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory; * {@link Pipeline.PipelineVisitor} that translates the Beam operators to their Spark counterparts. * It also does the pipeline preparation: mode detection, transforms replacement, classpath * preparation. If we have a streaming job, it is instantiated as a {@link - * StreamingPipelineTranslator}. If we have a batch job, it is instantiated as a {@link + * PipelineTranslatorStreaming}. If we have a batch job, it is instantiated as a {@link * PipelineTranslatorBatch}. */ public abstract class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults { diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java index 2a13d98..d966efb 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java @@ -183,7 +183,7 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport { } catch (IOException e) { throw new RuntimeException(e); } -return InternalRow.apply(asScalaBuffer(list).toList()); + return InternalRow.apply(asScalaBuffer(list).toList()); } @Override diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java index 26f1b9c..99d34a6 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java @@ -24,13 +24,14 @@ import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; +import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.PTransform; /** * {@link PipelineTranslator} for executing a {@link Pipeline} in Spark in batch mode. This contains - * only the components specific to batch: {@link TranslationContextBatch}, registry of batch {@link + * only the components specific to batch: registry of batch {@link * TransformTranslator} and registry lookup code. */ public class PipelineTranslatorBatch extends PipelineTranslator { @@ -69,7 +70,7 @@ public class PipelineTranslatorBatch extends PipelineTranslator { } public PipelineTranslatorBatch(SparkPipelineOptions options) { - translationContext = new TranslationContextBatch(options); + translationContext = new TranslationContext(options); } /** Returns a translator for the given node, if it is possible, otherwise null. */ diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/TranslationContextBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/TranslationContextBatch.java deleted file mode 100644 index e849471..0000000 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/TranslationContextBatch.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.structuredstreaming.translation.batch; - -import java.util.HashMap; -import java.util.Map; -import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions; -import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; -import org.apache.beam.sdk.values.PValue; -import org.apache.spark.sql.Dataset; - -/** This class contains only batch specific context components. */ -public class TranslationContextBatch extends TranslationContext { - - /** - * For keeping track about which DataSets don't have a successor. We need to terminate these with - * a discarding sink because the Beam model allows dangling operations. - */ - private final Map<PValue, Dataset<?>> danglingDataSets; - - public TranslationContextBatch(SparkPipelineOptions options) { - super(options); - this.danglingDataSets = new HashMap<>(); - } -} diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java deleted file mode 100644 index 81aead2..0000000 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.structuredstreaming.translation.batch.mocks; - -import static scala.collection.JavaConversions.asScalaBuffer; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.DataSourceV2; -import org.apache.spark.sql.sources.v2.ReadSupport; -import org.apache.spark.sql.sources.v2.reader.DataSourceReader; -import org.apache.spark.sql.sources.v2.reader.InputPartition; -import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; -import org.apache.spark.sql.types.StructType; -import org.joda.time.Instant; - -/** - * This is a mock source that gives values between 0 and 999. - */ -public class DatasetSourceMockBatch implements DataSourceV2, ReadSupport { - - private DatasetSourceMockBatch() { - } - - @Override public DataSourceReader createReader(DataSourceOptions options) { - return new DatasetReader(); - } - - /** This class can be mapped to Beam {@link BoundedSource}. */ - private static class DatasetReader implements DataSourceReader { - - @Override public StructType readSchema() { - return new StructType(); - } - - @Override public List<InputPartition<InternalRow>> planInputPartitions() { - List<InputPartition<InternalRow>> result = new ArrayList<>(); - result.add(new InputPartition<InternalRow>() { - - @Override public InputPartitionReader<InternalRow> createPartitionReader() { - return new DatasetPartitionReaderMock(); - } - }); - return result; - } - } - - /** This class is a mocked reader. */ - private static class DatasetPartitionReaderMock implements InputPartitionReader<InternalRow> { - - private ArrayList<Integer> values; - private int currentIndex = 0; - - private DatasetPartitionReaderMock() { - for (int i = 0; i < 1000; i++){ - values.add(i); - } - } - - @Override public boolean next() throws IOException { - currentIndex++; - return (currentIndex <= values.size()); - } - - @Override public void close() throws IOException { - } - - @Override public InternalRow get() { - List<Object> list = new ArrayList<>(); - list.add(WindowedValue.timestampedValueInGlobalWindow(values.get(currentIndex), new Instant())); - return InternalRow.apply(asScalaBuffer(list).toList()); - } - } -} \ No newline at end of file diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java deleted file mode 100644 index 5cfb755..0000000 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.structuredstreaming.translation.batch.mocks; - -import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; -import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; - -/** - * Mock translator that generates a source of 0 to 999 and prints it. - * @param <T> - */ -public class ReadSourceTranslatorMockBatch<T> - implements TransformTranslator<PTransform<PBegin, PCollection<T>>> { - - private static String sourceProviderClass = DatasetSourceMockBatch.class.getCanonicalName(); - - @SuppressWarnings("unchecked") - @Override - public void translateTransform( - PTransform<PBegin, PCollection<T>> transform, TranslationContext context) { - SparkSession sparkSession = context.getSparkSession(); - - Dataset<Row> rowDataset = sparkSession.read().format(sourceProviderClass).load(); - - MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() { - @Override public WindowedValue call(Row value) throws Exception { - //there is only one value put in each Row by the InputPartitionReader - return value.<WindowedValue>getAs(0); - } - }; - //TODO: is there a better way than using the raw WindowedValue? Can an Encoder<WindowedVAlue<T>> - // be created ? - Dataset<WindowedValue> dataset = rowDataset.map(func, Encoders.kryo(WindowedValue.class)); - - PCollection<T> output = (PCollection<T>) context.getOutput(); - context.putDatasetRaw(output, dataset); - } -} diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/package-info.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/package-info.java deleted file mode 100644 index 3c00aaf..0000000 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** Source mocks, only temporary waiting for the proper source to be done. */ -package org.apache.beam.runners.spark.structuredstreaming.translation.batch.mocks; diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java index 3175aed..69d85d6 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java @@ -17,25 +17,15 @@ */ package org.apache.beam.runners.spark.structuredstreaming.translation.streaming; -import static com.google.common.base.Preconditions.checkArgument; -import static scala.collection.JavaConversions.asScalaBuffer; - -import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Optional; -import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions; -import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.BoundedSource.BoundedReader; -import org.apache.beam.sdk.util.WindowedValue; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.sources.v2.ContinuousReadSupport; import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.DataSourceV2; import org.apache.spark.sql.sources.v2.MicroBatchReadSupport; import org.apache.spark.sql.sources.v2.reader.InputPartition; -import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader; import org.apache.spark.sql.sources.v2.reader.streaming.Offset; import org.apache.spark.sql.types.StructType; @@ -44,144 +34,53 @@ import org.apache.spark.sql.types.StructType; * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}. */ -public class DatasetSourceStreaming<T> implements DataSourceV2, MicroBatchReadSupport{ - - private int numPartitions; - private Long bundleSize; - private TranslationContext context; - private BoundedSource<T> source; - +public class DatasetSourceStreaming<T> implements DataSourceV2, MicroBatchReadSupport { - @Override - public MicroBatchReader createMicroBatchReader( - Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) { - this.numPartitions = context.getSparkSession().sparkContext().defaultParallelism(); - checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero."); - this.bundleSize = context.getOptions().getBundleSize(); - return new DatasetMicroBatchReader(schema, checkpointLocation, options); + @Override public MicroBatchReader createMicroBatchReader(Optional<StructType> schema, + String checkpointLocation, DataSourceOptions options) { + return new DatasetMicroBatchReader(checkpointLocation, options); } /** This class can be mapped to Beam {@link BoundedSource}. */ - private class DatasetMicroBatchReader implements MicroBatchReader { + private static class DatasetMicroBatchReader implements MicroBatchReader { - private Optional<StructType> schema; - private String checkpointLocation; - private DataSourceOptions options; - - private DatasetMicroBatchReader( - Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) { + private DatasetMicroBatchReader(String checkpointLocation, DataSourceOptions options) { //TODO deal with schema and options } - @Override - public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) { + @Override public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) { //TODO extension point for SDF } - @Override - public Offset getStartOffset() { + @Override public Offset getStartOffset() { //TODO extension point for SDF return null; } - @Override - public Offset getEndOffset() { + @Override public Offset getEndOffset() { //TODO extension point for SDF return null; } - @Override - public Offset deserializeOffset(String json) { + @Override public Offset deserializeOffset(String json) { //TODO extension point for SDF return null; } - @Override - public void commit(Offset end) { + @Override public void commit(Offset end) { //TODO no more to read after end Offset } - @Override - public void stop() {} + @Override public void stop() { + } - @Override - public StructType readSchema() { + @Override public StructType readSchema() { return null; } - @Override - public List<InputPartition<InternalRow>> planInputPartitions() { - List<InputPartition<InternalRow>> result = new ArrayList<>(); - long desiredSizeBytes; - SparkPipelineOptions options = context.getOptions(); - try { - desiredSizeBytes = - (bundleSize == null) - ? source.getEstimatedSizeBytes(options) / numPartitions - : bundleSize; - List<? extends BoundedSource<T>> sources = source.split(desiredSizeBytes, options); - for (BoundedSource<T> source : sources) { - result.add( - new InputPartition<InternalRow>() { - - @Override - public InputPartitionReader<InternalRow> createPartitionReader() { - BoundedReader<T> reader = null; - try { - reader = source.createReader(options); - } catch (IOException e) { - throw new RuntimeException( - "Error creating BoundedReader " + reader.getClass().getCanonicalName(), e); - } - return new DatasetMicroBatchPartitionReader(reader); - } - }); - } - return result; - - } catch (Exception e) { - throw new RuntimeException( - "Error in splitting BoundedSource " + source.getClass().getCanonicalName(), e); - } + @Override public List<InputPartition<InternalRow>> planInputPartitions() { + return null; } } - /** This class can be mapped to Beam {@link BoundedReader}. */ - private class DatasetMicroBatchPartitionReader implements InputPartitionReader<InternalRow> { - - BoundedReader<T> reader; - private boolean started; - private boolean closed; - - DatasetMicroBatchPartitionReader(BoundedReader<T> reader) { - this.reader = reader; - this.started = false; - this.closed = false; - } - - @Override - public boolean next() throws IOException { - if (!started) { - started = true; - return reader.start(); - } else { - return !closed && reader.advance(); - } - } - - @Override - public InternalRow get() { - List<Object> list = new ArrayList<>(); - list.add( - WindowedValue.timestampedValueInGlobalWindow( - reader.getCurrent(), reader.getCurrentTimestamp())); - return InternalRow.apply(asScalaBuffer(list).toList()); - } - - @Override - public void close() throws IOException { - closed = true; - reader.close(); - } - } } diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/PipelineTranslatorStreaming.java similarity index 87% rename from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java rename to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/PipelineTranslatorStreaming.java index 437aa25..20cefed 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/PipelineTranslatorStreaming.java @@ -25,12 +25,12 @@ import org.apache.beam.sdk.runners.TransformHierarchy; /** * {@link PipelineTranslator} for executing a {@link Pipeline} in Spark in streaming mode. This - * contains only the components specific to streaming: {@link StreamingTranslationContext}, registry + * contains only the components specific to streaming: registry * of batch {@link TransformTranslator} and registry lookup code. */ -public class StreamingPipelineTranslator extends PipelineTranslator { +public class PipelineTranslatorStreaming extends PipelineTranslator { - public StreamingPipelineTranslator(SparkPipelineOptions options) {} + public PipelineTranslatorStreaming(SparkPipelineOptions options) {} @Override protected TransformTranslator<?> getTransformTranslator(TransformHierarchy.Node node) { diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingTranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingTranslationContext.java deleted file mode 100644 index f827cc4..0000000 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingTranslationContext.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.structuredstreaming.translation.streaming; - -import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions; -import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; - -/** This class contains only streaming specific context components. */ -public class StreamingTranslationContext extends TranslationContext { - - public StreamingTranslationContext(SparkPipelineOptions options) { - super(options); - } -}