[cleanup] various small improvements - removed duplicate declarations in pom.xml - removed reference to junit.framework.* - removed 'static' from interface declarations
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/602d8fe9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/602d8fe9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/602d8fe9 Branch: refs/heads/master Commit: 602d8fe9b69dba2300a573559e7b60e286f116d1 Parents: 70ae13c Author: smarthi <smar...@apache.org> Authored: Thu Feb 11 17:33:51 2016 -0500 Committer: Davor Bonaci <davorbon...@users.noreply.github.com> Committed: Fri Mar 4 10:04:23 2016 -0800 ---------------------------------------------------------------------- runners/flink/pom.xml | 5 ----- .../flink/dataflow/FlinkJobExecutionEnvironment.java | 3 +-- .../flink/dataflow/examples/streaming/AutoComplete.java | 6 ++---- .../flink/dataflow/examples/streaming/JoinExamples.java | 6 +++--- .../examples/streaming/KafkaWindowedWordCountExample.java | 2 +- .../flink/dataflow/examples/streaming/WindowedWordCount.java | 3 +-- .../dataflow/translation/FlinkBatchPipelineTranslator.java | 3 --- .../dataflow/translation/FlinkBatchTransformTranslators.java | 2 +- .../translation/FlinkStreamingPipelineTranslator.java | 4 ---- .../translation/FlinkStreamingTransformTranslators.java | 5 ++--- .../translation/FlinkStreamingTranslationContext.java | 6 ++---- .../functions/FlinkCoGroupKeyedListAggregator.java | 2 +- .../flink/dataflow/translation/wrappers/SinkOutputFormat.java | 2 -- .../dataflow/translation/wrappers/SourceInputFormat.java | 7 +++---- .../java/com/dataartisans/flink/dataflow/WriteSinkITCase.java | 2 +- .../com/dataartisans/flink/dataflow/util/JoinExamples.java | 2 +- 16 files changed, 19 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/602d8fe9/runners/flink/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml index 14693b8..cb784a0 100644 --- a/runners/flink/pom.xml +++ b/runners/flink/pom.xml @@ -73,11 +73,6 @@ <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java</artifactId> - <version>${flink.version}</version> <scope>test</scope> <type>test-jar</type> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/602d8fe9/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java index 66d60fa..91b2f64 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.java.CollectionEnvironment; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -224,7 +223,7 @@ public class FlinkJobExecutionEnvironment { this.flinkStreamEnv.getConfig().setExecutionRetryDelay(3000); } - private final void checkInitializationState() { + private void checkInitializationState() { if (this.options == null) { throw new IllegalStateException("FlinkJobExecutionEnvironment is not initialized yet."); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/602d8fe9/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java index 0245a7b..711d9fb 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java @@ -36,8 +36,6 @@ import org.joda.time.Duration; import java.io.IOException; import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; /** * To run the example, first open a socket on a terminal by executing the command: @@ -242,7 +240,7 @@ public class AutoComplete { public void processElement(ProcessContext c) { String word = c.element().value; for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) { - KV kv = KV.of(word.substring(0, i), c.element()); + KV<String, CompletionCandidate> kv = KV.of(word.substring(0, i), c.element()); c.output(kv); } } @@ -349,7 +347,7 @@ public class AutoComplete { * * <p> Inherits standard Dataflow configuration options. */ - private static interface Options extends WindowedWordCount.StreamingWordCountOptions { + private interface Options extends WindowedWordCount.StreamingWordCountOptions { @Description("Whether to use the recursive algorithm") @Default.Boolean(true) Boolean getRecursive(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/602d8fe9/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java index b0cc4fa..9a5db64 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java @@ -53,8 +53,8 @@ public class JoinExamples { static PCollection<String> joinEvents(PCollection<String> streamA, PCollection<String> streamB) throws Exception { - final TupleTag<String> firstInfoTag = new TupleTag<String>(); - final TupleTag<String> secondInfoTag = new TupleTag<String>(); + final TupleTag<String> firstInfoTag = new TupleTag<>(); + final TupleTag<String> secondInfoTag = new TupleTag<>(); // transform both input collections to tuple collections, where the keys are country // codes in both cases. @@ -118,7 +118,7 @@ public class JoinExamples { } } - private static interface Options extends WindowedWordCount.StreamingWordCountOptions { + private interface Options extends WindowedWordCount.StreamingWordCountOptions { } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/602d8fe9/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java index 46c9bd6..42d3d88 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java @@ -72,7 +72,7 @@ public class KafkaWindowedWordCountExample { } } - public static interface KafkaStreamingWordCountOptions extends WindowedWordCount.StreamingWordCountOptions { + public interface KafkaStreamingWordCountOptions extends WindowedWordCount.StreamingWordCountOptions { @Description("The Kafka topic to read from") @Default.String(KAFKA_TOPIC) String getKafkaTopic(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/602d8fe9/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java index 1d4a44b..b539245 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java @@ -28,7 +28,6 @@ import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; import org.joda.time.Duration; -import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +80,7 @@ public class WindowedWordCount { } } - public static interface StreamingWordCountOptions extends com.dataartisans.flink.dataflow.examples.WordCount.Options { + public interface StreamingWordCountOptions extends com.dataartisans.flink.dataflow.examples.WordCount.Options { @Description("Sliding window duration, in seconds") @Default.Long(WINDOW_SIZE) Long getWindowSize(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/602d8fe9/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java index 8c0183e..a1e4410 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java @@ -115,9 +115,6 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator { } private <T extends PTransform<?, ?>> void applyBatchTransform(PTransform<?, ?> transform, TransformTreeNode node, BatchTransformTranslator<?> translator) { - if (this.batchContext == null) { - throw new IllegalStateException("The FlinkPipelineTranslator is not yet initialized."); - } @SuppressWarnings("unchecked") T typedTransform = (T) transform; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/602d8fe9/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java index 8f64730..9a43d05 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java @@ -283,7 +283,7 @@ public class FlinkBatchTransformTranslators { private static class ConsoleIOWriteTranslatorBatch implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ConsoleIO.Write.Bound> { @Override public void translateNode(ConsoleIO.Write.Bound transform, FlinkBatchTranslationContext context) { - PValue input = (PValue) context.getInput(transform); + PValue input = context.getInput(transform); DataSet<?> inputDataSet = context.getInputDataSet(input); inputDataSet.printOnTaskManager(transform.getName()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/602d8fe9/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java index c8760c7..a8f4226 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java @@ -19,7 +19,6 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.runners.TransformTreeNode; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; import com.google.cloud.dataflow.sdk.values.PValue; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -101,9 +100,6 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { } private <T extends PTransform<?, ?>> void applyStreamingTransform(PTransform<?, ?> transform, TransformTreeNode node, StreamTransformTranslator<?> translator) { - if (this.streamingContext == null) { - throw new IllegalStateException("The FlinkPipelineTranslator is not yet initialized."); - } @SuppressWarnings("unchecked") T typedTransform = (T) transform; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/602d8fe9/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java index 4c8cd4b..17583cd 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java @@ -76,8 +76,7 @@ public class FlinkStreamingTransformTranslators { } public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator(PTransform<?, ?> transform) { - FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> translator = TRANSLATORS.get(transform.getClass()); - return translator; + return TRANSLATORS.get(transform.getClass()); } // -------------------------------------------------------------------------------------------- @@ -123,7 +122,7 @@ public class FlinkStreamingTransformTranslators { public void translateNode(Read.Unbounded<T> transform, FlinkStreamingTranslationContext context) { PCollection<T> output = context.getOutput(transform); - DataStream<WindowedValue<T>> source = null; + DataStream<WindowedValue<T>> source; if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) { UnboundedFlinkSource flinkSource = (UnboundedFlinkSource) transform.getSource(); source = context.getExecutionEnvironment() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/602d8fe9/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java index 83ea575..df68e50 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java @@ -74,13 +74,11 @@ public class FlinkStreamingTranslationContext { @SuppressWarnings("unchecked") public <I extends PInput> I getInput(PTransform<I, ?> transform) { - I input = (I) currentTransform.getInput(); - return input; + return (I) currentTransform.getInput(); } @SuppressWarnings("unchecked") public <O extends POutput> O getOutput(PTransform<?, O> transform) { - O output = (O) currentTransform.getOutput(); - return output; + return (O) currentTransform.getOutput(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/602d8fe9/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCoGroupKeyedListAggregator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCoGroupKeyedListAggregator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCoGroupKeyedListAggregator.java index f859348..4c7fefd 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCoGroupKeyedListAggregator.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCoGroupKeyedListAggregator.java @@ -53,6 +53,6 @@ public class FlinkCoGroupKeyedListAggregator<K,V1,V2> implements CoGroupFunction k = entry.getKey(); result.add(new RawUnionValue(index2, entry.getValue())); } - out.collect(KV.of(k, new CoGbkResult(schema, (List) result))); + out.collect(KV.of(k, new CoGbkResult(schema, result))); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/602d8fe9/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java index b10c86f..ec8c186 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java @@ -23,7 +23,6 @@ import com.google.cloud.dataflow.sdk.io.Sink; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions; import com.google.cloud.dataflow.sdk.transforms.Write; -import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.AbstractID; @@ -32,7 +31,6 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.Field; -import java.util.UUID; /** * Wrapper class to use generic Write.Bound transforms as sinks. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/602d8fe9/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java index afb15da..b3eca96 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java @@ -45,7 +45,6 @@ public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>> private final BoundedSource<T> initialSource; private transient PipelineOptions options; - private final Coder<T> coder; private BoundedSource.BoundedReader<T> reader = null; private boolean reachedEnd = true; @@ -53,7 +52,7 @@ public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>> public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options, Coder<T> coder) { this.initialSource = initialSource; this.options = options; - this.coder = coder; + Coder<T> coder1 = coder; } private void writeObject(ObjectOutputStream out) @@ -111,12 +110,12 @@ public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>> @Override @SuppressWarnings("unchecked") public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException { - long desiredSizeBytes = 10000; + long desiredSizeBytes; try { desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits; List<? extends Source<T>> shards = initialSource.splitIntoBundles(desiredSizeBytes, options); - List<SourceInputSplit<T>> splits = new ArrayList<SourceInputSplit<T>>(); + List<SourceInputSplit<T>> splits = new ArrayList<>(); int splitCount = 0; for (Source<T> shard: shards) { splits.add(new SourceInputSplit<>(shard, splitCount++)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/602d8fe9/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java index c8302e8..205fe9b 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java @@ -32,7 +32,7 @@ import java.io.File; import java.io.PrintWriter; import java.net.URI; -import static junit.framework.Assert.*; +import static org.junit.Assert.*; /** * Tests the translation of custom Write.Bound sinks. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/602d8fe9/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java index 74f754b..aa5623d 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java @@ -76,7 +76,7 @@ public class JoinExamples { KV<String, CoGbkResult> e = c.element(); CoGbkResult val = e.getValue(); String countryCode = e.getKey(); - String countryName = "none"; + String countryName; countryName = e.getValue().getOnly(countryInfoTag, "Kostas"); for (String eventInfo : c.element().getValue().getAll(eventInfoTag)) { // Generate a string that combines information from both collection values