Repository: incubator-beam Updated Branches: refs/heads/master 4755c5a78 -> bb086b8d3
Fix bug in PipelineOptions DisplayData serialization PipelineOptions has been improved to generate display data to be consumed by a runner and used for display. However, there was a bug in the ProxyInvocationHandler implementation of PipelineOptions display data which was causing NullPointerExceptions when generated display data from PipelineOptions previously deserialized from JSON. This change also makes our error handling for display data exceptions consistent across the Dataflow runner: exceptions thrown during display data population will propogate out and cause the pipeline to fail. This is consistent with other user code which may throw exceptions at pipeline construction time. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1e669c44 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1e669c44 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1e669c44 Branch: refs/heads/master Commit: 1e669c44c9d2448b55f5bdba3dcff1831b2cd8b4 Parents: 4755c5a Author: Scott Wegner <sweg...@google.com> Authored: Thu May 19 09:17:37 2016 -0700 Committer: Scott Wegner <sweg...@google.com> Committed: Fri May 20 17:20:29 2016 -0700 ---------------------------------------------------------------------- .../dataflow/DataflowPipelineTranslator.java | 50 +---------- .../DataflowPipelineTranslatorTest.java | 63 -------------- .../sdk/options/ProxyInvocationHandler.java | 4 +- .../sdk/transforms/display/DisplayData.java | 14 +-- .../sdk/options/ProxyInvocationHandlerTest.java | 6 +- .../sdk/transforms/display/DisplayDataTest.java | 91 ++++++++++++++------ 6 files changed, 83 insertions(+), 145 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e669c44/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 7f67393..f5fefc0 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -29,7 +29,6 @@ import static org.apache.beam.sdk.util.Structs.addString; import static org.apache.beam.sdk.util.Structs.getString; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; import org.apache.beam.runners.dataflow.DataflowPipelineRunner.GroupByKeyAndSortValuesOnly; import org.apache.beam.runners.dataflow.internal.ReadTranslator; @@ -87,8 +86,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -726,18 +723,7 @@ public class DataflowPipelineTranslator { } private void addDisplayData(String stepName, HasDisplayData hasDisplayData) { - DisplayData displayData; - try { - displayData = DisplayData.from(hasDisplayData); - } catch (Exception e) { - String msg = String.format("Exception thrown while collecting display data for step: %s. " - + "Display data will be not be available for this step.", stepName); - DisplayDataException displayDataException = new DisplayDataException(msg, e); - LOG.warn(msg, displayDataException); - - displayData = displayDataException.asDisplayData(); - } - + DisplayData displayData = DisplayData.from(hasDisplayData); List<Map<String, Object>> list = MAPPER.convertValue(displayData, List.class); addList(getProperties(), PropertyNames.DISPLAY_DATA, list); } @@ -1056,38 +1042,4 @@ public class DataflowPipelineTranslator { context.addOutput(tag.getId(), output); } } - - /** - * Wraps exceptions thrown while collecting {@link DisplayData} for the Dataflow pipeline runner. - */ - static class DisplayDataException extends Exception implements HasDisplayData { - public DisplayDataException(String message, Throwable cause) { - super(checkNotNull(message), checkNotNull(cause)); - } - - /** - * Retrieve a display data representation of the exception, which can be submitted to - * the service in place of the actual display data. - */ - public DisplayData asDisplayData() { - return DisplayData.from(this); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - Throwable cause = getCause(); - builder - .add(DisplayData.item("exceptionMessage", getMessage())) - .add(DisplayData.item("exceptionType", cause.getClass())) - .add(DisplayData.item("exceptionCause", cause.getMessage())) - .add(DisplayData.item("stackTrace", stackTraceToString())); - } - - private String stackTraceToString() { - StringWriter stringWriter = new StringWriter(); - PrintWriter printWriter = new PrintWriter(stringWriter); - printStackTrace(printWriter); - return stringWriter.toString(); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e669c44/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 58c6f75..165d2b5 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -23,9 +23,7 @@ import static org.apache.beam.sdk.util.Structs.getString; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasKey; -import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -50,7 +48,6 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.RecordingPipelineVisitor; -import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -80,7 +77,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -106,9 +102,7 @@ import java.util.Map; */ @RunWith(JUnit4.class) public class DataflowPipelineTranslatorTest implements Serializable { - @Rule public transient ExpectedException thrown = ExpectedException.none(); - @Rule public transient ExpectedLogs logs = ExpectedLogs.none(DataflowPipelineTranslator.class); // A Custom Mockito matcher for an initial Job that checks that all // expected fields are set. @@ -973,61 +967,4 @@ public class DataflowPipelineTranslatorTest implements Serializable { assertEquals(expectedFn1DisplayData, ImmutableSet.copyOf(fn1displayData)); assertEquals(expectedFn2DisplayData, ImmutableSet.copyOf(fn2displayData)); } - - @Test - public void testCapturesDisplayDataExceptions() throws IOException { - DataflowPipelineOptions options = buildPipelineOptions(); - DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); - Pipeline pipeline = Pipeline.create(options); - - final RuntimeException displayDataException = new RuntimeException("foobar"); - pipeline - .apply(Create.of(1, 2, 3)) - .apply(ParDo.of(new DoFn<Integer, Integer>() { - @Override - public void processElement(ProcessContext c) throws Exception { - c.output(c.element()); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - throw displayDataException; - } - })); - - Job job = translator.translate( - pipeline, - (DataflowPipelineRunner) pipeline.getRunner(), - Collections.<DataflowPackage>emptyList()).getJob(); - - String expectedMessage = "Display data will be not be available for this step"; - logs.verifyWarn(expectedMessage); - - List<Step> steps = job.getSteps(); - assertEquals("Job should have 2 steps", 2, steps.size()); - - @SuppressWarnings("unchecked") - Iterable<Map<String, String>> displayData = (Collection<Map<String, String>>) steps.get(1) - .getProperties().get("display_data"); - - String namespace = DataflowPipelineTranslator.DisplayDataException.class.getName(); - Assert.assertThat(displayData, Matchers.<Map<String, String>>hasItem(allOf( - hasEntry("namespace", namespace), - hasEntry("key", "exceptionType"), - hasEntry("value", RuntimeException.class.getName())))); - - Assert.assertThat(displayData, Matchers.<Map<String, String>>hasItem(allOf( - hasEntry("namespace", namespace), - hasEntry("key", "exceptionMessage"), - hasEntry(is("value"), Matchers.containsString(expectedMessage))))); - - Assert.assertThat(displayData, Matchers.<Map<String, String>>hasItem(allOf( - hasEntry("namespace", namespace), - hasEntry("key", "exceptionCause"), - hasEntry("value", "foobar")))); - - Assert.assertThat(displayData, Matchers.<Map<String, String>>hasItem(allOf( - hasEntry("namespace", namespace), - hasEntry("key", "stackTrace")))); - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e669c44/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java index 159eb5b..3292a7f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java @@ -315,6 +315,7 @@ class ProxyInvocationHandler implements InvocationHandler { } Object value = getValueFromJson(jsonOption.getKey(), spec.getGetterMethod()); + value = value == null ? "" : value; DisplayData.Type type = DisplayData.inferType(value); if (type != null) { builder.add(DisplayData.item(jsonOption.getKey(), type, value) @@ -552,7 +553,8 @@ class ProxyInvocationHandler implements InvocationHandler { jgen.writeObject(serializableOptions); List<Map<String, Object>> serializedDisplayData = Lists.newArrayList(); - for (DisplayData.Item<?> item : DisplayData.from(value).items()) { + DisplayData displayData = DisplayData.from(value); + for (DisplayData.Item<?> item : displayData.items()) { @SuppressWarnings("unchecked") Map<String, Object> serializedItem = MAPPER.convertValue(item, Map.class); serializedDisplayData.add(serializedItem); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e669c44/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java index dc6e381..9e9bdbf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java @@ -72,10 +72,6 @@ public class DisplayData implements Serializable { * Collect the {@link DisplayData} from a component. This will traverse all subcomponents * specified via {@link Builder#include} in the given component. Data in this component will be in * a namespace derived from the component. - * - * <p>Pipeline runners should call this method in order to collect display data. While it should - * be safe to call {@code DisplayData.from} on any component which implements it, runners should - * be resilient to exceptions thrown while collecting display data. */ public static DisplayData from(HasDisplayData component) { checkNotNull(component, "component argument cannot be null"); @@ -603,7 +599,15 @@ public class DisplayData implements Serializable { if (newComponent) { String prevNs = this.latestNs; this.latestNs = namespace; - subComponent.populateDisplayData(this); + + try { + subComponent.populateDisplayData(this); + } catch (Throwable e) { + String msg = String.format("Error while populating display data for component: %s", + namespace); + throw new RuntimeException(msg, e); + } + this.latestNs = prevNs; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e669c44/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java index 6fc9700..110f30a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java @@ -863,12 +863,16 @@ public class ProxyInvocationHandlerTest { } @Test - public void testDisplayDataNullValuesConvertedToEmptyString() { + public void testDisplayDataNullValuesConvertedToEmptyString() throws Exception { FooOptions options = PipelineOptionsFactory.as(FooOptions.class); options.setFoo(null); DisplayData data = DisplayData.from(options); assertThat(data, hasDisplayItem("foo", "")); + + FooOptions deserializedOptions = serializeDeserialize(FooOptions.class, options); + DisplayData deserializedData = DisplayData.from(deserializedOptions); + assertThat(deserializedData, hasDisplayItem("foo", "")); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e669c44/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java index 21b2e33..478724b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java @@ -30,6 +30,7 @@ import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isA; import static org.hamcrest.Matchers.isEmptyOrNullString; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; @@ -39,11 +40,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.RunnableOnService; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -69,7 +65,6 @@ import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.ISODateTimeFormat; import org.junit.Rule; import org.junit.Test; -import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -86,6 +81,7 @@ import java.util.regex.Pattern; @RunWith(JUnit4.class) public class DisplayDataTest implements Serializable { @Rule public transient ExpectedException thrown = ExpectedException.none(); + private static final DateTimeFormatter ISO_FORMATTER = ISODateTimeFormat.dateTime(); private static final ObjectMapper MAPPER = new ObjectMapper(); @@ -413,7 +409,7 @@ public class DisplayDataTest implements Serializable { @Test public void testNullNamespaceOverride() { - thrown.expect(NullPointerException.class); + thrown.expectCause(isA(NullPointerException.class)); DisplayData.from(new HasDisplayData() { @Override @@ -516,7 +512,7 @@ public class DisplayDataTest implements Serializable { @Test public void testDuplicateKeyThrowsException() { - thrown.expect(IllegalArgumentException.class); + thrown.expectCause(isA(IllegalArgumentException.class)); DisplayData.from( new HasDisplayData() { @Override @@ -752,7 +748,7 @@ public class DisplayDataTest implements Serializable { } }; - thrown.expect(ClassCastException.class); + thrown.expectCause(isA(ClassCastException.class)); DisplayData.from(component); } @@ -838,7 +834,7 @@ public class DisplayDataTest implements Serializable { @Test public void testIncludeNull() { - thrown.expect(NullPointerException.class); + thrown.expectCause(isA(NullPointerException.class)); DisplayData.from( new HasDisplayData() { @Override @@ -856,7 +852,7 @@ public class DisplayDataTest implements Serializable { } }; - thrown.expect(NullPointerException.class); + thrown.expectCause(isA(NullPointerException.class)); DisplayData.from(new HasDisplayData() { @Override public void populateDisplayData(Builder builder) { @@ -867,7 +863,7 @@ public class DisplayDataTest implements Serializable { @Test public void testNullKey() { - thrown.expect(NullPointerException.class); + thrown.expectCause(isA(NullPointerException.class)); DisplayData.from( new HasDisplayData() { @Override @@ -968,23 +964,66 @@ public class DisplayDataTest implements Serializable { } /** - * Validate that all runners are resilient to exceptions thrown while retrieving display data. + * Verify that {@link DisplayData.Builder} can recover from exceptions thrown in user code. + * This is not used within the Beam SDK since we want all code to produce valid DisplayData. + * This test just ensures it is possible to write custom code that does recover. */ @Test - @Category(RunnableOnService.class) - public void testRunnersResilientToDisplayDataExceptions() { - Pipeline p = TestPipeline.create(); - PCollection<Integer> pCol = p - .apply(Create.of(1, 2, 3)) - .apply(new IdentityTransform<Integer>() { - @Override - public void populateDisplayData(Builder builder) { - throw new RuntimeException("bug!"); - } - }); + public void testCanRecoverFromBuildException() { + final HasDisplayData safeComponent = new HasDisplayData() { + @Override + public void populateDisplayData(Builder builder) { + builder.add(DisplayData.item("a", "a")); + } + }; + + final HasDisplayData failingComponent = new HasDisplayData() { + @Override + public void populateDisplayData(Builder builder) { + throw new RuntimeException("oh noes!"); + } + }; + + DisplayData displayData = DisplayData.from(new HasDisplayData() { + @Override + public void populateDisplayData(Builder builder) { + builder + .add(DisplayData.item("b", "b")) + .add(DisplayData.item("c", "c")); + + try { + builder.include(failingComponent); + fail("Expected exception not thrown"); + } catch (RuntimeException e) { + // Expected + } + + builder + .include(safeComponent) + .add(DisplayData.item("d", "d")); + } + }); + + assertThat(displayData, hasDisplayItem("a")); + assertThat(displayData, hasDisplayItem("b")); + assertThat(displayData, hasDisplayItem("c")); + assertThat(displayData, hasDisplayItem("d")); + } - PAssert.that(pCol).containsInAnyOrder(1, 2, 3); - p.run(); + @Test + public void testExceptionMessage() { + final RuntimeException cause = new RuntimeException("oh noes!"); + HasDisplayData component = new HasDisplayData() { + @Override + public void populateDisplayData(Builder builder) { + throw cause; + } + }; + + thrown.expectMessage(component.getClass().getName()); + thrown.expectCause(is(cause)); + + DisplayData.from(component); } private static class IdentityTransform<T> extends PTransform<PCollection<T>, PCollection<T>> {