Repository: beam Updated Branches: refs/heads/master 97f32804c -> d4db66dd6
Fix a bug in AvroIO, in which a SerializableFunction is created with a context containing a un-serializable member (Schema) Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e183b24e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e183b24e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e183b24e Branch: refs/heads/master Commit: e183b24ef9d07a6e2963c16c42c9d3a60166d3b0 Parents: 97f3280 Author: Yunqing Zhou <zhouyunq...@zhouyunqing-macbookpro3.roam.corp.google.com> Authored: Thu Aug 17 23:17:52 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Wed Aug 23 14:19:50 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/AvroIO.java | 21 +++++++++++++------- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 21 ++++++++++++++++++++ 2 files changed, 35 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e183b24e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 653b806..910d8e2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -762,15 +762,22 @@ public class AvroIO { return toResource(StaticValueProvider.of(outputPrefix)); } + private static class OutputPrefixToResourceId + implements SerializableFunction<String, ResourceId> { + @Override + public ResourceId apply(String input) { + return FileBasedSink.convertToFileResourceIfPossible(input); + } + } + /** Like {@link #to(String)}. */ public TypedWrite<UserT, OutputT> to(ValueProvider<String> outputPrefix) { - return toResource(NestedValueProvider.of(outputPrefix, - new SerializableFunction<String, ResourceId>() { - @Override - public ResourceId apply(String input) { - return FileBasedSink.convertToFileResourceIfPossible(input); - } - })); + return toResource( + NestedValueProvider.of( + outputPrefix, + // The function cannot be created as an anonymous class here since the enclosed class + // may contain unserializable members. + new OutputPrefixToResourceId())); } /** Like {@link #to(ResourceId)}. */ http://git-wip-us.apache.org/repos/asf/beam/blob/e183b24e/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index a96b6be..d0aa02c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -64,6 +64,7 @@ import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; @@ -216,6 +217,26 @@ public class AvroIOTest { @Test @Category(NeedsRunner.class) + public void testAvroIOWriteAndReadViaValueProvider() throws Throwable { + List<GenericClass> values = + ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); + File outputFile = tmpFolder.newFile("output.avro"); + + ValueProvider<String> pathProvider = StaticValueProvider.of(outputFile.getAbsolutePath()); + + writePipeline + .apply(Create.of(values)) + .apply(AvroIO.write(GenericClass.class).to(pathProvider).withoutSharding()); + writePipeline.run().waitUntilFinish(); + + PAssert.that(readPipeline.apply("Read", AvroIO.read(GenericClass.class).from(pathProvider))) + .containsInAnyOrder(values); + + readPipeline.run(); + } + + @Test + @Category(NeedsRunner.class) public void testAvroIOWriteAndReadMultipleFilepatterns() throws Throwable { List<GenericClass> firstValues = Lists.newArrayList(); List<GenericClass> secondValues = Lists.newArrayList();