Repository: incubator-beam Updated Branches: refs/heads/apex-runner c08ebbe79 -> 99001575d
Fix findbugs issues. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/99001575 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/99001575 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/99001575 Branch: refs/heads/apex-runner Commit: 99001575d266798cb5537c8a025735a095ac535e Parents: c08ebbe Author: Thomas Weise <t...@apache.org> Authored: Tue Nov 8 05:02:26 2016 +0100 Committer: Thomas Weise <t...@apache.org> Committed: Tue Nov 8 09:22:13 2016 +0100 ---------------------------------------------------------------------- .../src/main/java/org/apache/beam/runners/apex/ApexRunner.java | 5 +++-- .../java/org/apache/beam/runners/apex/ApexRunnerResult.java | 4 ++-- .../apex/translation/operators/ApexGroupByKeyOperator.java | 5 +---- .../runners/apex/translation/operators/ApexParDoOperator.java | 2 +- .../runners/apex/translation/ApexGroupByKeyOperatorTest.java | 5 +++++ 5 files changed, 12 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99001575/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java index b42dddf..5ce4fef 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java @@ -28,6 +28,7 @@ import com.google.common.base.Throwables; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.runners.apex.translation.ApexPipelineTranslator; import org.apache.beam.runners.core.AssignWindows; @@ -73,7 +74,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { * Holds any most resent assertion error that was raised while processing elements. * Used in the unit test driver in embedded mode to propagate the exception. */ - public static volatile AssertionError assertionError; + public static final AtomicReference<AssertionError> ASSERTION_ERROR = new AtomicReference<>(); public ApexRunner(ApexPipelineOptions options) { this.options = options; @@ -141,7 +142,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { // turns off timeout checking for operator progress lc.setHeartbeatMonitoringEnabled(false); } - assertionError = null; + ApexRunner.ASSERTION_ERROR.set(null); lc.runAsync(); return new ApexRunnerResult(lma.getDAG(), lc); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99001575/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java index 3ae69f2..18b50bc 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java @@ -95,8 +95,8 @@ public class ApexRunnerResult implements PipelineResult { appDoneField = ctrl.getClass().getDeclaredField("appDone"); appDoneField.setAccessible(true); while (!appDoneField.getBoolean(ctrl) && System.currentTimeMillis() < timeout) { - if (ApexRunner.assertionError != null) { - throw ApexRunner.assertionError; + if (ApexRunner.ASSERTION_ERROR.get() != null) { + throw ApexRunner.ASSERTION_ERROR.get(); } Thread.sleep(500); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99001575/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index 1b5e693..8fbfb03 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -32,7 +32,6 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import java.io.IOException; -import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; @@ -467,9 +466,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator { } - private class GroupByKeyStateInternalsFactory implements StateInternalsFactory<K>, Serializable { - private static final long serialVersionUID = 1L; - + private class GroupByKeyStateInternalsFactory implements StateInternalsFactory<K> { @Override public StateInternals<K> stateInternalsForKey(K key) { return getStateInternalsForKey(key); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99001575/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index 44e7b11..637c3ff 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -240,7 +240,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements return pushedBack; } catch (UserCodeException ue) { if (ue.getCause() instanceof AssertionError) { - ApexRunner.assertionError = (AssertionError) ue.getCause(); + ApexRunner.ASSERTION_ERROR.set((AssertionError) ue.getCause()); } throw ue; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99001575/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java index c0ddb83..fb80d0c 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.apex.translation; import com.datatorrent.api.Sink; +import com.datatorrent.lib.util.KryoCloneUtils; import com.google.common.collect.Lists; import java.util.List; @@ -68,6 +69,10 @@ public class ApexGroupByKeyOperatorTest { input, new ApexStateInternals.ApexStateInternalsFactory<String>() ); + operator.setup(null); + operator.beginWindow(1); + Assert.assertNotNull("Serialization", operator = KryoCloneUtils.cloneObject(operator)); + final List<Object> results = Lists.newArrayList(); Sink<Object> sink = new Sink<Object>() { @Override