Move some PCollectionView bits out of util
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c83cc744 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c83cc744 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c83cc744 Branch: refs/heads/master Commit: c83cc744a69f735ac134471705e3403b9d5edd34 Parents: b2553ca Author: Kenneth Knowles <k...@google.com> Authored: Wed May 3 20:34:54 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Thu May 4 16:06:56 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/apex/ApexRunner.java | 2 +- .../construction/PTransformMatchersTest.java | 2 +- .../runners/direct/PCollectionViewWindow.java | 67 +++ .../beam/runners/direct/SideInputContainer.java | 1 - .../runners/direct/SideInputContainerTest.java | 2 +- .../direct/ViewEvaluatorFactoryTest.java | 2 +- .../runners/direct/ViewOverrideFactoryTest.java | 2 +- .../direct/WriteWithShardingFactoryTest.java | 2 +- .../flink/FlinkStreamingViewOverrides.java | 2 +- .../runners/dataflow/BatchViewOverrides.java | 2 +- .../org/apache/beam/sdk/transforms/Combine.java | 2 +- .../org/apache/beam/sdk/transforms/View.java | 2 +- .../beam/sdk/util/PCollectionViewWindow.java | 67 --- .../apache/beam/sdk/util/PCollectionViews.java | 497 ------------------- .../beam/sdk/values/PCollectionViews.java | 495 ++++++++++++++++++ .../beam/sdk/transforms/DoFnTesterTest.java | 2 +- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 2 +- 17 files changed, 574 insertions(+), 577 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java index 1c845c6..e1828c3 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java @@ -60,9 +60,9 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.View.AsIterable; import org.apache.beam.sdk.transforms.View.AsSingleton; -import org.apache.beam.sdk.util.PCollectionViews; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PCollectionViews; import org.apache.hadoop.conf.Configuration; /** http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java index cb28c34..6271234 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java @@ -60,13 +60,13 @@ import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.PCollectionViews; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PCollectionViews; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWindow.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWindow.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWindow.java new file mode 100644 index 0000000..7a7d8ff --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWindow.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import java.util.Objects; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * A pair of a {@link PCollectionView} and a {@link BoundedWindow}, which can + * be thought of as window "of" the view. This is a value class for use e.g. + * as a compound cache key. + * + * @param <T> the type of the underlying PCollectionView + */ +public final class PCollectionViewWindow<T> { + + private final PCollectionView<T> view; + private final BoundedWindow window; + + private PCollectionViewWindow(PCollectionView<T> view, BoundedWindow window) { + this.view = view; + this.window = window; + } + + public static <T> PCollectionViewWindow<T> of(PCollectionView<T> view, BoundedWindow window) { + return new PCollectionViewWindow<>(view, window); + } + + public PCollectionView<T> getView() { + return view; + } + + public BoundedWindow getWindow() { + return window; + } + + @Override + public boolean equals(Object otherObject) { + if (!(otherObject instanceof PCollectionViewWindow)) { + return false; + } + @SuppressWarnings("unchecked") + PCollectionViewWindow<T> other = (PCollectionViewWindow<T>) otherObject; + return getView().equals(other.getView()) && getWindow().equals(other.getWindow()); + } + + @Override + public int hashCode() { + return Objects.hash(getView(), getWindow()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java index 380dc65..43da92f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java @@ -39,7 +39,6 @@ import org.apache.beam.runners.core.ReadyCheckingSideInputReader; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.PCollectionViewWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java index d4ca9fd..5e7c799 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java @@ -45,11 +45,11 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; -import org.apache.beam.sdk.util.PCollectionViews; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PCollectionViews; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java index 9560e94..d8869b2 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java @@ -34,10 +34,10 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.util.PCollectionViews; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PCollectionViews; import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java index a36787a..eda00a7 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java @@ -39,9 +39,9 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; -import org.apache.beam.sdk.util.PCollectionViews; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PCollectionViews; import org.apache.beam.sdk.values.WindowingStrategy; import org.hamcrest.Matchers; import org.junit.Rule; http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java index 6fffd1a..a2b0c5c 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java @@ -54,9 +54,9 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnTester; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.util.PCollectionViews; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PCollectionViews; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java index f955f2a..ce1c895 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java @@ -30,10 +30,10 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.util.PCollectionViews; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PCollectionViews; /** * Flink streaming overrides for various view (side input) transforms. http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index 1ff8a3f..debaf59 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -74,7 +74,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.PCollectionViews; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; @@ -84,6 +83,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PCollectionViews; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 1be948f..666db3b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -57,13 +57,13 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.AppliedCombineFn; import org.apache.beam.sdk.util.NameUtils; import org.apache.beam.sdk.util.NameUtils.NameOverride; -import org.apache.beam.sdk.util.PCollectionViews; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PCollectionViews; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java index d17d423..d7b8145 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java @@ -25,10 +25,10 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.runners.TransformHierarchy.Node; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.PCollectionViews; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PCollectionViews; /** * Transforms for creating {@link PCollectionView PCollectionViews} from http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViewWindow.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViewWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViewWindow.java deleted file mode 100644 index 410c8ce..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViewWindow.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import java.util.Objects; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.values.PCollectionView; - -/** - * A pair of a {@link PCollectionView} and a {@link BoundedWindow}, which can - * be thought of as window "of" the view. This is a value class for use e.g. - * as a compound cache key. - * - * @param <T> the type of the underlying PCollectionView - */ -public final class PCollectionViewWindow<T> { - - private final PCollectionView<T> view; - private final BoundedWindow window; - - private PCollectionViewWindow(PCollectionView<T> view, BoundedWindow window) { - this.view = view; - this.window = window; - } - - public static <T> PCollectionViewWindow<T> of(PCollectionView<T> view, BoundedWindow window) { - return new PCollectionViewWindow<>(view, window); - } - - public PCollectionView<T> getView() { - return view; - } - - public BoundedWindow getWindow() { - return window; - } - - @Override - public boolean equals(Object otherObject) { - if (!(otherObject instanceof PCollectionViewWindow)) { - return false; - } - @SuppressWarnings("unchecked") - PCollectionViewWindow<T> other = (PCollectionViewWindow<T>) otherObject; - return getView().equals(other.getView()) && getWindow().equals(other.getWindow()); - } - - @Override - public int hashCode() { - return Objects.hash(getView(), getWindow()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java deleted file mode 100644 index a07bc5e..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java +++ /dev/null @@ -1,497 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.common.base.Function; -import com.google.common.base.MoreObjects; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Multimap; -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Objects; -import javax.annotation.Nullable; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.IterableCoder; -import org.apache.beam.sdk.transforms.Materialization; -import org.apache.beam.sdk.transforms.Materializations; -import org.apache.beam.sdk.transforms.ViewFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.InvalidWindows; -import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PValueBase; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; - -/** - * Implementations of {@link PCollectionView} shared across the SDK. - * - * <p>For internal use only, subject to change. - */ -public class PCollectionViews { - - /** - * Returns a {@code PCollectionView<T>} capable of processing elements encoded using the provided - * {@link Coder} and windowed using the provided * {@link WindowingStrategy}. - * - * <p>If {@code hasDefault} is {@code true}, then the view will take on the value - * {@code defaultValue} for any empty windows. - */ - public static <T, W extends BoundedWindow> PCollectionView<T> singletonView( - PCollection<T> pCollection, - WindowingStrategy<?, W> windowingStrategy, - boolean hasDefault, - @Nullable T defaultValue, - Coder<T> valueCoder) { - return new SimplePCollectionView<>( - pCollection, - new SingletonViewFn<>(hasDefault, defaultValue, valueCoder), - windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), - windowingStrategy, - valueCoder); - } - - /** - * Returns a {@code PCollectionView<Iterable<T>>} capable of processing elements encoded using the - * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}. - */ - public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> iterableView( - PCollection<T> pCollection, - WindowingStrategy<?, W> windowingStrategy, - Coder<T> valueCoder) { - return new SimplePCollectionView<>( - pCollection, - new IterableViewFn<T>(), - windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), - windowingStrategy, - valueCoder); - } - - /** - * Returns a {@code PCollectionView<List<T>>} capable of processing elements encoded using the - * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}. - */ - public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView( - PCollection<T> pCollection, - WindowingStrategy<?, W> windowingStrategy, - Coder<T> valueCoder) { - return new SimplePCollectionView<>( - pCollection, - new ListViewFn<T>(), - windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), - windowingStrategy, - valueCoder); - } - - /** - * Returns a {@code PCollectionView<Map<K, V>>} capable of processing elements encoded using the - * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}. - */ - public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, V>> mapView( - PCollection<KV<K, V>> pCollection, - WindowingStrategy<?, W> windowingStrategy, - Coder<KV<K, V>> valueCoder) { - return new SimplePCollectionView<>( - pCollection, - new MapViewFn<K, V>(), - windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), - windowingStrategy, - valueCoder); - } - - /** - * Returns a {@code PCollectionView<Map<K, Iterable<V>>>} capable of processing elements encoded - * using the provided {@link Coder} and windowed using the provided {@link WindowingStrategy}. - */ - public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>> multimapView( - PCollection<KV<K, V>> pCollection, - WindowingStrategy<?, W> windowingStrategy, - Coder<KV<K, V>> valueCoder) { - return new SimplePCollectionView<>( - pCollection, - new MultimapViewFn<K, V>(), - windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), - windowingStrategy, - valueCoder); - } - - /** - * Implementation of conversion of singleton {@code Iterable<WindowedValue<T>>} to {@code T}. - * - * <p>For internal use only. - * - * <p>Instantiate via {@link PCollectionViews#singletonView}. - * - * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive - * view type. - */ - @Deprecated - @Experimental(Kind.CORE_RUNNERS_ONLY) - public static class SingletonViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, T> { - @Nullable private byte[] encodedDefaultValue; - @Nullable private transient T defaultValue; - @Nullable private Coder<T> valueCoder; - private boolean hasDefault; - - private SingletonViewFn(boolean hasDefault, T defaultValue, Coder<T> valueCoder) { - this.hasDefault = hasDefault; - this.defaultValue = defaultValue; - this.valueCoder = valueCoder; - if (hasDefault) { - try { - this.encodedDefaultValue = CoderUtils.encodeToByteArray(valueCoder, defaultValue); - } catch (IOException e) { - throw new RuntimeException("Unexpected IOException: ", e); - } - } - } - - /** - * Returns the default value that was specified. - * - * <p>For internal use only. - * - * @throws NoSuchElementException if no default was specified. - */ - public T getDefaultValue() { - if (!hasDefault) { - throw new NoSuchElementException("Empty PCollection accessed as a singleton view."); - } - // Lazily decode the default value once - synchronized (this) { - if (encodedDefaultValue != null && defaultValue == null) { - try { - defaultValue = CoderUtils.decodeFromByteArray(valueCoder, encodedDefaultValue); - } catch (IOException e) { - throw new RuntimeException("Unexpected IOException: ", e); - } - } - return defaultValue; - } - } - - @Override - public Materialization<Iterable<WindowedValue<T>>> getMaterialization() { - return Materializations.iterable(); - } - - @Override - public T apply(Iterable<WindowedValue<T>> contents) { - try { - return Iterables.getOnlyElement(contents).getValue(); - } catch (NoSuchElementException exc) { - return getDefaultValue(); - } catch (IllegalArgumentException exc) { - throw new IllegalArgumentException( - "PCollection with more than one element " - + "accessed as a singleton view."); - } - } - } - - /** - * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code Iterable<T>}. - * - * <p>For internal use only. - * - * <p>Instantiate via {@link PCollectionViews#iterableView}. - * - * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive - * view type. - */ - @Deprecated - @Experimental(Kind.CORE_RUNNERS_ONLY) - public static class IterableViewFn<T> - extends ViewFn<Iterable<WindowedValue<T>>, Iterable<T>> { - @Override - public Materialization<Iterable<WindowedValue<T>>> getMaterialization() { - return Materializations.iterable(); - } - - @Override - public Iterable<T> apply(Iterable<WindowedValue<T>> contents) { - return Iterables.unmodifiableIterable( - Iterables.transform(contents, new Function<WindowedValue<T>, T>() { - @SuppressWarnings("unchecked") - @Override - public T apply(WindowedValue<T> input) { - return input.getValue(); - } - })); - } - } - - /** - * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code List<T>}. - * - * <p>For internal use only. - * - * <p>Instantiate via {@link PCollectionViews#listView}. - * - * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive - * view type. - */ - @Deprecated - @Experimental(Kind.CORE_RUNNERS_ONLY) - public static class ListViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, List<T>> { - @Override - public Materialization<Iterable<WindowedValue<T>>> getMaterialization() { - return Materializations.iterable(); - } - - @Override - public List<T> apply(Iterable<WindowedValue<T>> contents) { - return ImmutableList.copyOf( - Iterables.transform(contents, new Function<WindowedValue<T>, T>() { - @SuppressWarnings("unchecked") - @Override - public T apply(WindowedValue<T> input) { - return input.getValue(); - } - })); - } - } - - /** - * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>>} - * to {@code Map<K, Iterable<V>>}. - * - * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive - * view type. - */ - @Deprecated - @Experimental(Kind.CORE_RUNNERS_ONLY) - public static class MultimapViewFn<K, V> - extends ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, Iterable<V>>> { - @Override - public Materialization<Iterable<WindowedValue<KV<K, V>>>> getMaterialization() { - return Materializations.iterable(); - } - - @Override - public Map<K, Iterable<V>> apply(Iterable<WindowedValue<KV<K, V>>> elements) { - Multimap<K, V> multimap = HashMultimap.create(); - for (WindowedValue<KV<K, V>> elem : elements) { - KV<K, V> kv = elem.getValue(); - multimap.put(kv.getKey(), kv.getValue()); - } - // Safe covariant cast that Java cannot express without rawtypes, even with unchecked casts - @SuppressWarnings({"unchecked", "rawtypes"}) - Map<K, Iterable<V>> resultMap = (Map) multimap.asMap(); - return Collections.unmodifiableMap(resultMap); - } - } - - /** - * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>} with one value per key to - * {@code Map<K, V>}. - * - * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive - * view type. - */ - @Deprecated - @Experimental(Kind.CORE_RUNNERS_ONLY) - public static class MapViewFn<K, V> extends ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, V>> { - @Override - public Materialization<Iterable<WindowedValue<KV<K, V>>>> getMaterialization() { - return Materializations.iterable(); - } - - /** - * Input iterable must actually be {@code Iterable<WindowedValue<KV<K, V>>>}. - */ - @Override - public Map<K, V> apply(Iterable<WindowedValue<KV<K, V>>> elements) { - Map<K, V> map = new HashMap<>(); - for (WindowedValue<KV<K, V>> elem : elements) { - KV<K, V> kv = elem.getValue(); - if (map.containsKey(kv.getKey())) { - throw new IllegalArgumentException("Duplicate values for " + kv.getKey()); - } - map.put(kv.getKey(), kv.getValue()); - } - return Collections.unmodifiableMap(map); - } - } - - /** - * A class for {@link PCollectionView} implementations, with additional type parameters - * that are not visible at pipeline assembly time when the view is used as a side input. - * - * <p>For internal use only. - */ - public static class SimplePCollectionView<ElemT, ViewT, W extends BoundedWindow> - extends PValueBase - implements PCollectionView<ViewT> { - /** The {@link PCollection} this view was originally created from. */ - private transient PCollection<ElemT> pCollection; - - /** A unique tag for the view, typed according to the elements underlying the view. */ - private TupleTag<Iterable<WindowedValue<ElemT>>> tag; - - private WindowMappingFn<W> windowMappingFn; - - /** The windowing strategy for the PCollection underlying the view. */ - private WindowingStrategy<?, W> windowingStrategy; - - /** The coder for the elements underlying the view. */ - private Coder<Iterable<WindowedValue<ElemT>>> coder; - - /** - * The typed {@link ViewFn} for this view. - */ - private ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn; - - /** - * Call this constructor to initialize the fields for which this base class provides - * boilerplate accessors. - */ - private SimplePCollectionView( - PCollection<ElemT> pCollection, - TupleTag<Iterable<WindowedValue<ElemT>>> tag, - ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, - WindowMappingFn<W> windowMappingFn, - WindowingStrategy<?, W> windowingStrategy, - Coder<ElemT> valueCoder) { - super(pCollection.getPipeline()); - this.pCollection = pCollection; - if (windowingStrategy.getWindowFn() instanceof InvalidWindows) { - throw new IllegalArgumentException("WindowFn of PCollectionView cannot be InvalidWindows"); - } - this.windowMappingFn = windowMappingFn; - this.tag = tag; - this.windowingStrategy = windowingStrategy; - this.viewFn = viewFn; - this.coder = - IterableCoder.of(WindowedValue.getFullCoder( - valueCoder, windowingStrategy.getWindowFn().windowCoder())); - } - - /** - * Call this constructor to initialize the fields for which this base class provides - * boilerplate accessors, with an auto-generated tag. - */ - private SimplePCollectionView( - PCollection<ElemT> pCollection, - ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, - WindowMappingFn<W> windowMappingFn, - WindowingStrategy<?, W> windowingStrategy, - Coder<ElemT> valueCoder) { - this( - pCollection, - new TupleTag<Iterable<WindowedValue<ElemT>>>(), - viewFn, - windowMappingFn, - windowingStrategy, - valueCoder); - } - - /** - * For serialization only. Do not use directly. - */ - @SuppressWarnings("unused") // used for serialization - protected SimplePCollectionView() { - super(); - } - - @Override - public ViewFn<Iterable<WindowedValue<?>>, ViewT> getViewFn() { - // Safe cast: it is required that the rest of the SDK maintain the invariant - // that a PCollectionView is only provided an iterable for the elements of an - // appropriately typed PCollection. - @SuppressWarnings({"rawtypes", "unchecked"}) - ViewFn<Iterable<WindowedValue<?>>, ViewT> untypedViewFn = (ViewFn) viewFn; - return untypedViewFn; - } - - @Override - public WindowMappingFn<?> getWindowMappingFn() { - return windowMappingFn; - } - - @Override - public PCollection<?> getPCollection() { - return pCollection; - } - - /** - * Returns a unique {@link TupleTag} identifying this {@link PCollectionView}. - * - * <p>For internal use only by runner implementors. - */ - @Override - public TupleTag<Iterable<WindowedValue<?>>> getTagInternal() { - // Safe cast: It is required that the rest of the SDK maintain the invariant that - // this tag is only used to access the contents of an appropriately typed underlying - // PCollection - @SuppressWarnings({"rawtypes", "unchecked"}) - TupleTag<Iterable<WindowedValue<?>>> untypedTag = (TupleTag) tag; - return untypedTag; - } - - /** - * Returns the {@link WindowingStrategy} of this {@link PCollectionView}, which should - * be that of the underlying {@link PCollection}. - * - * <p>For internal use only by runner implementors. - */ - @Override - public WindowingStrategy<?, ?> getWindowingStrategyInternal() { - return windowingStrategy; - } - - @Override - public Coder<Iterable<WindowedValue<?>>> getCoderInternal() { - // Safe cast: It is required that the rest of the SDK only use this untyped coder - // for the elements of an appropriately typed underlying PCollection. - @SuppressWarnings({"rawtypes", "unchecked"}) - Coder<Iterable<WindowedValue<?>>> untypedCoder = (Coder) coder; - return untypedCoder; - } - - @Override - public int hashCode() { - return Objects.hash(tag); - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof PCollectionView)) { - return false; - } - @SuppressWarnings("unchecked") - PCollectionView<?> otherView = (PCollectionView<?>) other; - return tag.equals(otherView.getTagInternal()); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this).add("tag", tag).toString(); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java new file mode 100644 index 0000000..74887c7 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java @@ -0,0 +1,495 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.values; + +import com.google.common.base.Function; +import com.google.common.base.MoreObjects; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Multimap; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.transforms.Materialization; +import org.apache.beam.sdk.transforms.Materializations; +import org.apache.beam.sdk.transforms.ViewFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.InvalidWindows; +import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.WindowedValue; + +/** + * <b>For internal use only; no backwards compatibility guarantees.</b> + * + * <p>Implementations of {@link PCollectionView} shared across the SDK. + */ +@Internal +public class PCollectionViews { + + /** + * Returns a {@code PCollectionView<T>} capable of processing elements encoded using the provided + * {@link Coder} and windowed using the provided * {@link WindowingStrategy}. + * + * <p>If {@code hasDefault} is {@code true}, then the view will take on the value + * {@code defaultValue} for any empty windows. + */ + public static <T, W extends BoundedWindow> PCollectionView<T> singletonView( + PCollection<T> pCollection, + WindowingStrategy<?, W> windowingStrategy, + boolean hasDefault, + @Nullable T defaultValue, + Coder<T> valueCoder) { + return new SimplePCollectionView<>( + pCollection, + new SingletonViewFn<>(hasDefault, defaultValue, valueCoder), + windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), + windowingStrategy, + valueCoder); + } + + /** + * Returns a {@code PCollectionView<Iterable<T>>} capable of processing elements encoded using the + * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}. + */ + public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> iterableView( + PCollection<T> pCollection, + WindowingStrategy<?, W> windowingStrategy, + Coder<T> valueCoder) { + return new SimplePCollectionView<>( + pCollection, + new IterableViewFn<T>(), + windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), + windowingStrategy, + valueCoder); + } + + /** + * Returns a {@code PCollectionView<List<T>>} capable of processing elements encoded using the + * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}. + */ + public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView( + PCollection<T> pCollection, + WindowingStrategy<?, W> windowingStrategy, + Coder<T> valueCoder) { + return new SimplePCollectionView<>( + pCollection, + new ListViewFn<T>(), + windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), + windowingStrategy, + valueCoder); + } + + /** + * Returns a {@code PCollectionView<Map<K, V>>} capable of processing elements encoded using the + * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}. + */ + public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, V>> mapView( + PCollection<KV<K, V>> pCollection, + WindowingStrategy<?, W> windowingStrategy, + Coder<KV<K, V>> valueCoder) { + return new SimplePCollectionView<>( + pCollection, + new MapViewFn<K, V>(), + windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), + windowingStrategy, + valueCoder); + } + + /** + * Returns a {@code PCollectionView<Map<K, Iterable<V>>>} capable of processing elements encoded + * using the provided {@link Coder} and windowed using the provided {@link WindowingStrategy}. + */ + public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>> multimapView( + PCollection<KV<K, V>> pCollection, + WindowingStrategy<?, W> windowingStrategy, + Coder<KV<K, V>> valueCoder) { + return new SimplePCollectionView<>( + pCollection, + new MultimapViewFn<K, V>(), + windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), + windowingStrategy, + valueCoder); + } + + /** + * Implementation of conversion of singleton {@code Iterable<WindowedValue<T>>} to {@code T}. + * + * <p>For internal use only. + * + * <p>Instantiate via {@link PCollectionViews#singletonView}. + * + * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive + * view type. + */ + @Deprecated + @Experimental(Kind.CORE_RUNNERS_ONLY) + public static class SingletonViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, T> { + @Nullable private byte[] encodedDefaultValue; + @Nullable private transient T defaultValue; + @Nullable private Coder<T> valueCoder; + private boolean hasDefault; + + private SingletonViewFn(boolean hasDefault, T defaultValue, Coder<T> valueCoder) { + this.hasDefault = hasDefault; + this.defaultValue = defaultValue; + this.valueCoder = valueCoder; + if (hasDefault) { + try { + this.encodedDefaultValue = CoderUtils.encodeToByteArray(valueCoder, defaultValue); + } catch (IOException e) { + throw new RuntimeException("Unexpected IOException: ", e); + } + } + } + + /** + * Returns the default value that was specified. + * + * <p>For internal use only. + * + * @throws NoSuchElementException if no default was specified. + */ + public T getDefaultValue() { + if (!hasDefault) { + throw new NoSuchElementException("Empty PCollection accessed as a singleton view."); + } + // Lazily decode the default value once + synchronized (this) { + if (encodedDefaultValue != null && defaultValue == null) { + try { + defaultValue = CoderUtils.decodeFromByteArray(valueCoder, encodedDefaultValue); + } catch (IOException e) { + throw new RuntimeException("Unexpected IOException: ", e); + } + } + return defaultValue; + } + } + + @Override + public Materialization<Iterable<WindowedValue<T>>> getMaterialization() { + return Materializations.iterable(); + } + + @Override + public T apply(Iterable<WindowedValue<T>> contents) { + try { + return Iterables.getOnlyElement(contents).getValue(); + } catch (NoSuchElementException exc) { + return getDefaultValue(); + } catch (IllegalArgumentException exc) { + throw new IllegalArgumentException( + "PCollection with more than one element " + + "accessed as a singleton view."); + } + } + } + + /** + * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code Iterable<T>}. + * + * <p>For internal use only. + * + * <p>Instantiate via {@link PCollectionViews#iterableView}. + * + * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive + * view type. + */ + @Deprecated + @Experimental(Kind.CORE_RUNNERS_ONLY) + public static class IterableViewFn<T> + extends ViewFn<Iterable<WindowedValue<T>>, Iterable<T>> { + @Override + public Materialization<Iterable<WindowedValue<T>>> getMaterialization() { + return Materializations.iterable(); + } + + @Override + public Iterable<T> apply(Iterable<WindowedValue<T>> contents) { + return Iterables.unmodifiableIterable( + Iterables.transform(contents, new Function<WindowedValue<T>, T>() { + @SuppressWarnings("unchecked") + @Override + public T apply(WindowedValue<T> input) { + return input.getValue(); + } + })); + } + } + + /** + * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code List<T>}. + * + * <p>For internal use only. + * + * <p>Instantiate via {@link PCollectionViews#listView}. + * + * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive + * view type. + */ + @Deprecated + @Experimental(Kind.CORE_RUNNERS_ONLY) + public static class ListViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, List<T>> { + @Override + public Materialization<Iterable<WindowedValue<T>>> getMaterialization() { + return Materializations.iterable(); + } + + @Override + public List<T> apply(Iterable<WindowedValue<T>> contents) { + return ImmutableList.copyOf( + Iterables.transform(contents, new Function<WindowedValue<T>, T>() { + @SuppressWarnings("unchecked") + @Override + public T apply(WindowedValue<T> input) { + return input.getValue(); + } + })); + } + } + + /** + * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>>} + * to {@code Map<K, Iterable<V>>}. + * + * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive + * view type. + */ + @Deprecated + @Experimental(Kind.CORE_RUNNERS_ONLY) + public static class MultimapViewFn<K, V> + extends ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, Iterable<V>>> { + @Override + public Materialization<Iterable<WindowedValue<KV<K, V>>>> getMaterialization() { + return Materializations.iterable(); + } + + @Override + public Map<K, Iterable<V>> apply(Iterable<WindowedValue<KV<K, V>>> elements) { + Multimap<K, V> multimap = HashMultimap.create(); + for (WindowedValue<KV<K, V>> elem : elements) { + KV<K, V> kv = elem.getValue(); + multimap.put(kv.getKey(), kv.getValue()); + } + // Safe covariant cast that Java cannot express without rawtypes, even with unchecked casts + @SuppressWarnings({"unchecked", "rawtypes"}) + Map<K, Iterable<V>> resultMap = (Map) multimap.asMap(); + return Collections.unmodifiableMap(resultMap); + } + } + + /** + * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>} with one value per key to + * {@code Map<K, V>}. + * + * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive + * view type. + */ + @Deprecated + @Experimental(Kind.CORE_RUNNERS_ONLY) + public static class MapViewFn<K, V> extends ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, V>> { + @Override + public Materialization<Iterable<WindowedValue<KV<K, V>>>> getMaterialization() { + return Materializations.iterable(); + } + + /** + * Input iterable must actually be {@code Iterable<WindowedValue<KV<K, V>>>}. + */ + @Override + public Map<K, V> apply(Iterable<WindowedValue<KV<K, V>>> elements) { + Map<K, V> map = new HashMap<>(); + for (WindowedValue<KV<K, V>> elem : elements) { + KV<K, V> kv = elem.getValue(); + if (map.containsKey(kv.getKey())) { + throw new IllegalArgumentException("Duplicate values for " + kv.getKey()); + } + map.put(kv.getKey(), kv.getValue()); + } + return Collections.unmodifiableMap(map); + } + } + + /** + * A class for {@link PCollectionView} implementations, with additional type parameters + * that are not visible at pipeline assembly time when the view is used as a side input. + * + * <p>For internal use only. + */ + public static class SimplePCollectionView<ElemT, ViewT, W extends BoundedWindow> + extends PValueBase + implements PCollectionView<ViewT> { + /** The {@link PCollection} this view was originally created from. */ + private transient PCollection<ElemT> pCollection; + + /** A unique tag for the view, typed according to the elements underlying the view. */ + private TupleTag<Iterable<WindowedValue<ElemT>>> tag; + + private WindowMappingFn<W> windowMappingFn; + + /** The windowing strategy for the PCollection underlying the view. */ + private WindowingStrategy<?, W> windowingStrategy; + + /** The coder for the elements underlying the view. */ + private Coder<Iterable<WindowedValue<ElemT>>> coder; + + /** + * The typed {@link ViewFn} for this view. + */ + private ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn; + + /** + * Call this constructor to initialize the fields for which this base class provides + * boilerplate accessors. + */ + private SimplePCollectionView( + PCollection<ElemT> pCollection, + TupleTag<Iterable<WindowedValue<ElemT>>> tag, + ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, + WindowMappingFn<W> windowMappingFn, + WindowingStrategy<?, W> windowingStrategy, + Coder<ElemT> valueCoder) { + super(pCollection.getPipeline()); + this.pCollection = pCollection; + if (windowingStrategy.getWindowFn() instanceof InvalidWindows) { + throw new IllegalArgumentException("WindowFn of PCollectionView cannot be InvalidWindows"); + } + this.windowMappingFn = windowMappingFn; + this.tag = tag; + this.windowingStrategy = windowingStrategy; + this.viewFn = viewFn; + this.coder = + IterableCoder.of(WindowedValue.getFullCoder( + valueCoder, windowingStrategy.getWindowFn().windowCoder())); + } + + /** + * Call this constructor to initialize the fields for which this base class provides + * boilerplate accessors, with an auto-generated tag. + */ + private SimplePCollectionView( + PCollection<ElemT> pCollection, + ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, + WindowMappingFn<W> windowMappingFn, + WindowingStrategy<?, W> windowingStrategy, + Coder<ElemT> valueCoder) { + this( + pCollection, + new TupleTag<Iterable<WindowedValue<ElemT>>>(), + viewFn, + windowMappingFn, + windowingStrategy, + valueCoder); + } + + /** + * For serialization only. Do not use directly. + */ + @SuppressWarnings("unused") // used for serialization + protected SimplePCollectionView() { + super(); + } + + @Override + public ViewFn<Iterable<WindowedValue<?>>, ViewT> getViewFn() { + // Safe cast: it is required that the rest of the SDK maintain the invariant + // that a PCollectionView is only provided an iterable for the elements of an + // appropriately typed PCollection. + @SuppressWarnings({"rawtypes", "unchecked"}) + ViewFn<Iterable<WindowedValue<?>>, ViewT> untypedViewFn = (ViewFn) viewFn; + return untypedViewFn; + } + + @Override + public WindowMappingFn<?> getWindowMappingFn() { + return windowMappingFn; + } + + @Override + public PCollection<?> getPCollection() { + return pCollection; + } + + /** + * Returns a unique {@link TupleTag} identifying this {@link PCollectionView}. + * + * <p>For internal use only by runner implementors. + */ + @Override + public TupleTag<Iterable<WindowedValue<?>>> getTagInternal() { + // Safe cast: It is required that the rest of the SDK maintain the invariant that + // this tag is only used to access the contents of an appropriately typed underlying + // PCollection + @SuppressWarnings({"rawtypes", "unchecked"}) + TupleTag<Iterable<WindowedValue<?>>> untypedTag = (TupleTag) tag; + return untypedTag; + } + + /** + * Returns the {@link WindowingStrategy} of this {@link PCollectionView}, which should + * be that of the underlying {@link PCollection}. + * + * <p>For internal use only by runner implementors. + */ + @Override + public WindowingStrategy<?, ?> getWindowingStrategyInternal() { + return windowingStrategy; + } + + @Override + public Coder<Iterable<WindowedValue<?>>> getCoderInternal() { + // Safe cast: It is required that the rest of the SDK only use this untyped coder + // for the elements of an appropriately typed underlying PCollection. + @SuppressWarnings({"rawtypes", "unchecked"}) + Coder<Iterable<WindowedValue<?>>> untypedCoder = (Coder) coder; + return untypedCoder; + } + + @Override + public int hashCode() { + return Objects.hash(tag); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof PCollectionView)) { + return false; + } + @SuppressWarnings("unchecked") + PCollectionView<?> otherView = (PCollectionView<?>) other; + return tag.equals(otherView.getTagInternal()); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("tag", tag).toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java index f74d673..1bb71bb 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java @@ -34,10 +34,10 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.util.PCollectionViews; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PCollectionViews; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.WindowingStrategy; import org.hamcrest.Matchers; http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 38da2d9..a3b21ee 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -121,11 +121,11 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.MimeTypes; -import org.apache.beam.sdk.util.PCollectionViews; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PCollectionViews; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.ValueInSingleWindow;