Add WriteFiles translation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f3ed5a4f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f3ed5a4f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f3ed5a4f Branch: refs/heads/master Commit: f3ed5a4f4f05c67371b51e6f8742f554b282eedf Parents: 0093cf5 Author: Kenneth Knowles <k...@google.com> Authored: Tue May 30 14:43:25 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Thu Jun 1 10:56:59 2017 -0700 ---------------------------------------------------------------------- .../construction/PTransformTranslation.java | 3 + .../construction/WriteFilesTranslation.java | 152 +++++++++++++++ .../construction/WriteFilesTranslationTest.java | 186 +++++++++++++++++++ 3 files changed, 341 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f3ed5a4f/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java index 00ea55e..99d1e85 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -50,6 +50,9 @@ public class PTransformTranslation { public static final String READ_TRANSFORM_URN = "urn:beam:transform:read:v1"; public static final String WINDOW_TRANSFORM_URN = "urn:beam:transform:window:v1"; + // Less well-known. And where shall these live? + public static final String WRITE_FILES_TRANSFORM_URN = "urn:beam:transform:write_files:0.1"; + private static final Map<Class<? extends PTransform>, TransformPayloadTranslator> KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators(); http://git-wip-us.apache.org/repos/asf/beam/blob/f3ed5a4f/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java new file mode 100644 index 0000000..99b77ef --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.WriteFilesPayload; +import org.apache.beam.sdk.io.FileBasedSink; +import org.apache.beam.sdk.io.WriteFiles; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +/** + * Utility methods for translating a {@link WriteFiles} to and from {@link RunnerApi} + * representations. + */ +public class WriteFilesTranslation { + + /** The URN for an unknown Java {@link FileBasedSink}. */ + public static final String CUSTOM_JAVA_FILE_BASED_SINK_URN = + "urn:beam:file_based_sink:javasdk:0.1"; + + @VisibleForTesting + static WriteFilesPayload toProto(WriteFiles<?> transform) { + return WriteFilesPayload.newBuilder() + .setSink(toProto(transform.getSink())) + .setWindowedWrites(transform.isWindowedWrites()) + .setRunnerDeterminedSharding( + transform.getNumShards() == null && transform.getSharding() == null) + .build(); + } + + private static SdkFunctionSpec toProto(FileBasedSink<?> sink) { + return SdkFunctionSpec.newBuilder() + .setSpec( + FunctionSpec.newBuilder() + .setUrn(CUSTOM_JAVA_FILE_BASED_SINK_URN) + .setParameter( + Any.pack( + BytesValue.newBuilder() + .setValue( + ByteString.copyFrom(SerializableUtils.serializeToByteArray(sink))) + .build()))) + .build(); + } + + @VisibleForTesting + static FileBasedSink<?> sinkFromProto(SdkFunctionSpec sinkProto) throws IOException { + checkArgument( + sinkProto.getSpec().getUrn().equals(CUSTOM_JAVA_FILE_BASED_SINK_URN), + "Cannot extract %s instance from %s with URN %s", + FileBasedSink.class.getSimpleName(), + FunctionSpec.class.getSimpleName(), + sinkProto.getSpec().getUrn()); + + byte[] serializedSink = + sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(); + + return (FileBasedSink<?>) + SerializableUtils.deserializeFromByteArray( + serializedSink, FileBasedSink.class.getSimpleName()); + } + + public static <T> FileBasedSink<T> getSink( + AppliedPTransform<PCollection<T>, PDone, ? extends PTransform<PCollection<T>, PDone>> + transform) + throws IOException { + return (FileBasedSink<T>) sinkFromProto(getWriteFilesPayload(transform).getSink()); + } + + public static <T> boolean isWindowedWrites( + AppliedPTransform<PCollection<T>, PDone, ? extends PTransform<PCollection<T>, PDone>> + transform) + throws IOException { + return getWriteFilesPayload(transform).getWindowedWrites(); + } + + public static <T> boolean isRunnerDeterminedSharding( + AppliedPTransform<PCollection<T>, PDone, ? extends PTransform<PCollection<T>, PDone>> + transform) + throws IOException { + return getWriteFilesPayload(transform).getRunnerDeterminedSharding(); + } + + private static <T> WriteFilesPayload getWriteFilesPayload( + AppliedPTransform<PCollection<T>, PDone, ? extends PTransform<PCollection<T>, PDone>> + transform) + throws IOException { + return PTransformTranslation.toProto( + transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), SdkComponents.create()) + .getSpec() + .getParameter() + .unpack(WriteFilesPayload.class); + } + + static class WriteFilesTranslator implements TransformPayloadTranslator<WriteFiles<?>> { + @Override + public String getUrn(WriteFiles<?> transform) { + return PTransformTranslation.WRITE_FILES_TRANSFORM_URN; + } + + @Override + public FunctionSpec translate( + AppliedPTransform<?, ?, WriteFiles<?>> transform, SdkComponents components) { + return FunctionSpec.newBuilder() + .setUrn(getUrn(transform.getTransform())) + .setParameter(Any.pack(toProto(transform.getTransform()))) + .build(); + } + } + + /** Registers {@link WriteFilesTranslator}. */ + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class Registrar implements TransformPayloadTranslatorRegistrar { + @Override + public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator> + getTransformPayloadTranslators() { + return Collections.singletonMap(WriteFiles.class, new WriteFilesTranslator()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f3ed5a4f/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java new file mode 100644 index 0000000..739034c --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload; +import org.apache.beam.sdk.io.FileBasedSink; +import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.WriteFiles; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; +import org.junit.runners.Suite; + +/** Tests for {@link WriteFilesTranslation}. */ +@RunWith(Suite.class) +@Suite.SuiteClasses({ + WriteFilesTranslationTest.TestWriteFilesPayloadTranslation.class, +}) +public class WriteFilesTranslationTest { + + /** Tests for translating various {@link ParDo} transforms to/from {@link ParDoPayload} protos. */ + @RunWith(Parameterized.class) + public static class TestWriteFilesPayloadTranslation { + @Parameters(name = "{index}: {0}") + public static Iterable<WriteFiles<?>> data() { + return ImmutableList.<WriteFiles<?>>of( + WriteFiles.to(new DummySink()), + WriteFiles.to(new DummySink()).withWindowedWrites(), + WriteFiles.to(new DummySink()).withNumShards(17), + WriteFiles.to(new DummySink()).withWindowedWrites().withNumShards(42)); + } + + @Parameter(0) + public WriteFiles<String> writeFiles; + + public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + + @Test + public void testEncodedProto() throws Exception { + RunnerApi.WriteFilesPayload payload = WriteFilesTranslation.toProto(writeFiles); + + assertThat( + payload.getRunnerDeterminedSharding(), + equalTo(writeFiles.getNumShards() == null && writeFiles.getSharding() == null)); + + assertThat(payload.getWindowedWrites(), equalTo(writeFiles.isWindowedWrites())); + + assertThat( + (FileBasedSink<String>) WriteFilesTranslation.sinkFromProto(payload.getSink()), + equalTo(writeFiles.getSink())); + } + + @Test + public void testExtractionDirectFromTransform() throws Exception { + PCollection<String> input = p.apply(Create.of("hello")); + PDone output = input.apply(writeFiles); + + AppliedPTransform<PCollection<String>, PDone, WriteFiles<String>> appliedPTransform = + AppliedPTransform.<PCollection<String>, PDone, WriteFiles<String>>of( + "foo", input.expand(), output.expand(), writeFiles, p); + + assertThat( + WriteFilesTranslation.isRunnerDeterminedSharding(appliedPTransform), + equalTo(writeFiles.getNumShards() == null && writeFiles.getSharding() == null)); + + assertThat( + WriteFilesTranslation.isWindowedWrites(appliedPTransform), + equalTo(writeFiles.isWindowedWrites())); + + assertThat(WriteFilesTranslation.getSink(appliedPTransform), equalTo(writeFiles.getSink())); + } + } + + /** + * A simple {@link FileBasedSink} for testing serialization/deserialization. Not mocked to avoid + * any issues serializing mocks. + */ + private static class DummySink extends FileBasedSink<String> { + + DummySink() { + super( + StaticValueProvider.of(FileSystems.matchNewResource("nowhere", false)), + new DummyFilenamePolicy()); + } + + @Override + public WriteOperation<String> createWriteOperation() { + return new DummyWriteOperation(this); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof DummySink)) { + return false; + } + + DummySink that = (DummySink) other; + + return getFilenamePolicy().equals(((DummySink) other).getFilenamePolicy()) + && getBaseOutputDirectoryProvider().isAccessible() + && that.getBaseOutputDirectoryProvider().isAccessible() + && getBaseOutputDirectoryProvider() + .get() + .equals(that.getBaseOutputDirectoryProvider().get()); + } + + @Override + public int hashCode() { + return Objects.hash( + DummySink.class, + getFilenamePolicy(), + getBaseOutputDirectoryProvider().isAccessible() + ? getBaseOutputDirectoryProvider().get() + : null); + } + } + + private static class DummyWriteOperation extends FileBasedSink.WriteOperation<String> { + public DummyWriteOperation(FileBasedSink<String> sink) { + super(sink); + } + + @Override + public FileBasedSink.Writer<String> createWriter() throws Exception { + throw new UnsupportedOperationException("Should never be called."); + } + } + + private static class DummyFilenamePolicy extends FilenamePolicy { + @Override + public ResourceId windowedFilename( + ResourceId outputDirectory, WindowedContext c, String extension) { + throw new UnsupportedOperationException("Should never be called."); + } + + @Nullable + @Override + public ResourceId unwindowedFilename(ResourceId outputDirectory, Context c, String extension) { + throw new UnsupportedOperationException("Should never be called."); + } + + @Override + public boolean equals(Object other) { + return other instanceof DummyFilenamePolicy; + } + + @Override + public int hashCode() { + return DummyFilenamePolicy.class.hashCode(); + } + } +}