Add SdkComponents This takes SDK objects and assigns IDs to them. It is effectively a ComponentsBuilder context where a component is referred to by the Java object which is being translated, rather than by an opaque string or protocol buffer.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b7d7adc8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b7d7adc8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b7d7adc8 Branch: refs/heads/master Commit: b7d7adc879694cf5b22f80a26a46982730a483ec Parents: fc10065 Author: Thomas Groh <tg...@google.com> Authored: Fri Apr 7 09:17:19 2017 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Mon Apr 10 15:14:34 2017 -0700 ---------------------------------------------------------------------- runners/core-construction-java/pom.xml | 4 + .../core/construction/SdkComponents.java | 152 +++++++++++++++++++ .../core/construction/SdkComponentsTest.java | 131 ++++++++++++++++ .../beam/sdk/transforms/AppliedPTransform.java | 2 + 4 files changed, 289 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b7d7adc8/runners/core-construction-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml index 78b6819..ee64f91 100644 --- a/runners/core-construction-java/pom.xml +++ b/runners/core-construction-java/pom.xml @@ -56,6 +56,10 @@ <dependencies> <dependency> <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-common-runner-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/b7d7adc8/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java new file mode 100644 index 0000000..c4b8cf1 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import com.google.common.base.Equivalence; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import java.util.Set; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.NameUtils; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollection; + +/** SDK objects that will be represented at some later point within a {@link Components} object. */ +class SdkComponents { + private final RunnerApi.Components.Builder componentsBuilder; + + private final BiMap<AppliedPTransform<?, ?, ?>, String> transformIds; + private final BiMap<PCollection<?>, String> pCollectionIds; + private final BiMap<WindowingStrategy<?, ?>, String> windowingStrategyIds; + + /** A map of Coder to IDs. Coders are stored here with identity equivalence. */ + private final BiMap<Equivalence.Wrapper<? extends Coder<?>>, String> coderIds; + // TODO: Specify environments + + /** Create a new {@link SdkComponents} with no components. */ + static SdkComponents create() { + return new SdkComponents(); + } + + private SdkComponents() { + this.componentsBuilder = RunnerApi.Components.newBuilder(); + this.transformIds = HashBiMap.create(); + this.pCollectionIds = HashBiMap.create(); + this.windowingStrategyIds = HashBiMap.create(); + this.coderIds = HashBiMap.create(); + } + + /** + * Registers the provided {@link AppliedPTransform} into this {@link SdkComponents}, returning a + * unique ID for the {@link AppliedPTransform}. Multiple registrations of the same + * {@link AppliedPTransform} will return the same unique ID. + */ + String registerPTransform(AppliedPTransform<?, ?, ?> pTransform) { + String existing = transformIds.get(pTransform); + if (existing != null) { + return existing; + } + String name = pTransform.getFullName(); + if (name.isEmpty()) { + name = uniqify("unnamed_ptransform", transformIds.values()); + } + transformIds.put(pTransform, name); + return name; + } + + /** + * Registers the provided {@link PCollection} into this {@link SdkComponents}, returning a unique + * ID for the {@link PCollection}. Multiple registrations of the same {@link PCollection} will + * return the same unique ID. + */ + String registerPCollection(PCollection<?> pCollection) { + String existing = pCollectionIds.get(pCollection); + if (existing != null) { + return existing; + } + String uniqueName = uniqify(pCollection.getName(), pCollectionIds.values()); + pCollectionIds.put(pCollection, uniqueName); + return uniqueName; + } + + /** + * Registers the provided {@link WindowingStrategy} into this {@link SdkComponents}, returning a + * unique ID for the {@link WindowingStrategy}. Multiple registrations of the same {@link + * WindowingStrategy} will return the same unique ID. + */ + String registerWindowingStrategy(WindowingStrategy<?, ?> windowingStrategy) { + String existing = windowingStrategyIds.get(windowingStrategy); + if (existing != null) { + return existing; + } + String baseName = + String.format( + "%s(%s)", + NameUtils.approximateSimpleName(windowingStrategy), + NameUtils.approximateSimpleName(windowingStrategy.getWindowFn())); + String name = uniqify(baseName, windowingStrategyIds.values()); + windowingStrategyIds.put(windowingStrategy, name); + return name; + } + + /** + * Registers the provided {@link Coder} into this {@link SdkComponents}, returning a unique ID for + * the {@link Coder}. Multiple registrations of the same {@link Coder} will return the same + * unique ID. + * + * <p>Coders are stored by identity to ensure that coders with implementations of {@link + * #equals(Object)} and {@link #hashCode()} but incompatible binary formats are not considered the + * same coder. + */ + String registerCoder(Coder<?> coder) { + String existing = coderIds.get(Equivalence.identity().wrap(coder)); + if (existing != null) { + return existing; + } + String baseName = NameUtils.approximateSimpleName(coder); + String name = uniqify(baseName, coderIds.values()); + coderIds.put(Equivalence.identity().wrap(coder), name); + return name; + } + + private String uniqify(String baseName, Set<String> existing) { + String name = baseName; + int increment = 1; + while (existing.contains(name)) { + name = baseName + Integer.toString(increment); + increment++; + } + return name; + } + + /** + * Convert this {@link SdkComponents} into a {@link RunnerApi.Components}, including all of the + * contained {@link Coder coders}, {@link WindowingStrategy windowing strategies}, {@link + * PCollection PCollections}, and {@link PTransform PTransforms}. + */ + @Experimental + RunnerApi.Components toComponents() { + return componentsBuilder.build(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/b7d7adc8/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java new file mode 100644 index 0000000..c96e57c --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.isEmptyOrNullString; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SetCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link SdkComponents}. */ +@RunWith(JUnit4.class) +public class SdkComponentsTest { + @Rule + public TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Rule + public ExpectedException thrown = ExpectedException.none(); + + private SdkComponents components = SdkComponents.create(); + + @Test + public void registerCoder() { + Coder<?> coder = + KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(SetCoder.of(ByteArrayCoder.of()))); + String id = components.registerCoder(coder); + assertThat(components.registerCoder(coder), equalTo(id)); + assertThat(id, not(isEmptyOrNullString())); + assertThat(components.registerCoder(VarLongCoder.of()), not(equalTo(id))); + } + + @Test + public void registerTransform() { + Create.Values<Integer> create = Create.of(1, 2, 3); + PCollection<Integer> pt = pipeline.apply(create); + String userName = "my_transform/my_nesting"; + AppliedPTransform<?, ?, ?> transform = + AppliedPTransform.<PBegin, PCollection<Integer>, Create.Values<Integer>>of( + userName, pipeline.begin().expand(), pt.expand(), create, pipeline); + String componentName = components.registerPTransform(transform); + assertThat(componentName, equalTo(userName)); + assertThat(components.registerPTransform(transform), equalTo(componentName)); + } + + @Test + public void registerTransformIdEmptyFullName() { + Create.Values<Integer> create = Create.of(1, 2, 3); + PCollection<Integer> pt = pipeline.apply(create); + AppliedPTransform<?, ?, ?> transform = + AppliedPTransform.<PBegin, PCollection<Integer>, Create.Values<Integer>>of( + "", pipeline.begin().expand(), pt.expand(), create, pipeline); + String assignedName = components.registerPTransform(transform); + + assertThat(assignedName, not(isEmptyOrNullString())); + } + + @Test + public void registerPCollection() { + PCollection<Long> pCollection = pipeline.apply(CountingInput.unbounded()).setName("foo"); + String id = components.registerPCollection(pCollection); + assertThat(id, equalTo("foo")); + } + + @Test + public void registerPCollectionExistingNameCollision() { + PCollection<Long> pCollection = + pipeline.apply("FirstCount", CountingInput.unbounded()).setName("foo"); + String firstId = components.registerPCollection(pCollection); + PCollection<Long> duplicate = + pipeline.apply("SecondCount", CountingInput.unbounded()).setName("foo"); + String secondId = components.registerPCollection(duplicate); + assertThat(firstId, equalTo("foo")); + assertThat(secondId, containsString("foo")); + assertThat(secondId, not(equalTo("foo"))); + } + + @Test + public void registerWindowingStrategy() { + WindowingStrategy<?, ?> strategy = + WindowingStrategy.globalDefault().withMode(AccumulationMode.ACCUMULATING_FIRED_PANES); + String name = components.registerWindowingStrategy(strategy); + assertThat(name, not(isEmptyOrNullString())); + } + + @Test + public void registerWindowingStrategyIdEqualStrategies() { + WindowingStrategy<?, ?> strategy = + WindowingStrategy.globalDefault().withMode(AccumulationMode.ACCUMULATING_FIRED_PANES); + String name = components.registerWindowingStrategy(strategy); + String duplicateName = + components.registerWindowingStrategy( + WindowingStrategy.globalDefault().withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)); + assertThat(name, equalTo(duplicateName)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/b7d7adc8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java index 4de81ac..e78d795 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java @@ -38,6 +38,8 @@ import org.apache.beam.sdk.values.TaggedPValue; public abstract class AppliedPTransform< InputT extends PInput, OutputT extends POutput, TransformT extends PTransform<? super InputT, OutputT>> { + // To prevent extension outside of this package. + AppliedPTransform() {} public static <InputT extends PInput, OutputT extends POutput, TransformT extends PTransform<? super InputT, OutputT>>