Repository: incubator-beam Updated Branches: refs/heads/master ef750c0f8 -> 66faf74d0
[BEAM-931] Fix Findbugs Warnings in Flink Runner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a609a19e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a609a19e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a609a19e Branch: refs/heads/master Commit: a609a19e6df763c0aa77d83f05e21ec343f6dcdb Parents: ef750c0 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Tue Nov 8 11:56:45 2016 +0100 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Thu Nov 10 10:29:43 2016 +0100 ---------------------------------------------------------------------- .../org/apache/beam/runners/flink/examples/TFIDF.java | 11 +++++++++-- runners/flink/pom.xml | 8 -------- .../java/org/apache/beam/runners/flink/FlinkRunner.java | 4 ++-- .../translation/FlinkStreamingTransformTranslators.java | 2 -- .../wrappers/SerializableFnAggregatorWrapper.java | 7 +++++++ .../wrappers/streaming/WindowDoFnOperator.java | 2 +- .../wrappers/streaming/io/UnboundedSocketSource.java | 5 +---- .../wrappers/streaming/io/UnboundedSourceWrapper.java | 3 ++- 8 files changed, 22 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a609a19e/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java index cf5c8f5..b946d98 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java @@ -129,7 +129,12 @@ public class TFIDF { Set<URI> uris = new HashSet<>(); if (absoluteUri.getScheme().equals("file")) { File directory = new File(absoluteUri); - for (String entry : directory.list()) { + String[] directoryListing = directory.list(); + if (directoryListing == null) { + throw new IOException( + "Directory " + absoluteUri + " is not a valid path or IO Error occurred."); + } + for (String entry : directoryListing) { File path = new File(directory, entry); uris.add(path.toURI()); } @@ -157,7 +162,9 @@ public class TFIDF { extends PTransform<PBegin, PCollection<KV<URI, String>>> { private static final long serialVersionUID = 0; - private Iterable<URI> uris; + // transient because PTransform is not really meant to be serialized. + // see note on PTransform + private final transient Iterable<URI> uris; public ReadDocuments(Iterable<URI> uris) { this.uris = uris; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a609a19e/runners/flink/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml index 1b73922..f93af85 100644 --- a/runners/flink/pom.xml +++ b/runners/flink/pom.xml @@ -96,14 +96,6 @@ </executions> </plugin> - <!-- BEAM-931 --> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>findbugs-maven-plugin</artifactId> - <configuration> - <skip>true</skip> - </configuration> - </plugin> </plugins> </pluginManagement> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a609a19e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java index 12e21c7..488c170 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java @@ -301,7 +301,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> { private static class StreamingViewAsMap<K, V> extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> { - private final FlinkRunner runner; + private final transient FlinkRunner runner; @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() public StreamingViewAsMap(FlinkRunner runner, View.AsMap<K, V> transform) { @@ -343,7 +343,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> { private static class StreamingViewAsMultimap<K, V> extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> { - private final FlinkRunner runner; + private final transient FlinkRunner runner; /** * Builds an instance of this class from the overridden transform. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a609a19e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index 4b819b7..069162f 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -249,7 +249,6 @@ public class FlinkStreamingTransformTranslators { }}); } else { try { - transform.getSource(); UnboundedSourceWrapper<T, ?> sourceWrapper = new UnboundedSourceWrapper<>( context.getPipelineOptions(), @@ -279,7 +278,6 @@ public class FlinkStreamingTransformTranslators { DataStream<WindowedValue<T>> source; try { - transform.getSource(); BoundedSourceWrapper<T> sourceWrapper = new BoundedSourceWrapper<>( context.getPipelineOptions(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a609a19e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java index 25d777a..70d97e3 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java @@ -80,6 +80,13 @@ public class SerializableFnAggregatorWrapper<InputT, OutputT> @Override public Accumulator<InputT, Serializable> clone() { + try { + super.clone(); + } catch (CloneNotSupportedException e) { + // Flink Accumulators cannot throw CloneNotSupportedException, work around that. + throw new RuntimeException(e); + } + // copy it by merging OutputT resultCopy = combiner.apply(Lists.newArrayList((InputT) aa)); SerializableFnAggregatorWrapper<InputT, OutputT> result = new http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a609a19e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index 5debd4b..432dc64 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -92,7 +92,7 @@ public class WindowDoFnOperator<K, InputT, OutputT> private transient Multiset<Long> processingTimeTimerTimestamps; private transient Map<Long, ScheduledFuture<?>> processingTimeTimerFutures; - private FlinkStateInternals<K> stateInternals; + private transient FlinkStateInternals<K> stateInternals; private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a609a19e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java index 96b5138..ed03dda 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java @@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkArgument; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; -import java.io.Serializable; import java.net.InetSocketAddress; import java.net.Socket; import java.util.Collections; @@ -131,10 +130,8 @@ public class UnboundedSocketSource<CheckpointMarkT extends UnboundedSource.Check /** * Unbounded socket reader. */ - public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String> - implements Serializable { + public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String> { - private static final long serialVersionUID = 7526472295622776147L; private static final Logger LOG = LoggerFactory.getLogger(UnboundedSocketReader.class); private final UnboundedSocketSource source; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a609a19e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 68a83e8..af955ba 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -240,7 +240,8 @@ public class UnboundedSourceWrapper< // Flink will interrupt us at some point //noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized (waitLock) { - waitLock.wait(); + // don't wait indefinitely, in case something goes horribly wrong + waitLock.wait(1000); } } catch (InterruptedException e) { if (!isRunning) {