Move some PCollectionView bits out of util

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c83cc744
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c83cc744
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c83cc744

Branch: refs/heads/master
Commit: c83cc744a69f735ac134471705e3403b9d5edd34
Parents: b2553ca
Author: Kenneth Knowles <k...@google.com>
Authored: Wed May 3 20:34:54 2017 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Thu May 4 16:06:56 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    |   2 +-
 .../construction/PTransformMatchersTest.java    |   2 +-
 .../runners/direct/PCollectionViewWindow.java   |  67 +++
 .../beam/runners/direct/SideInputContainer.java |   1 -
 .../runners/direct/SideInputContainerTest.java  |   2 +-
 .../direct/ViewEvaluatorFactoryTest.java        |   2 +-
 .../runners/direct/ViewOverrideFactoryTest.java |   2 +-
 .../direct/WriteWithShardingFactoryTest.java    |   2 +-
 .../flink/FlinkStreamingViewOverrides.java      |   2 +-
 .../runners/dataflow/BatchViewOverrides.java    |   2 +-
 .../org/apache/beam/sdk/transforms/Combine.java |   2 +-
 .../org/apache/beam/sdk/transforms/View.java    |   2 +-
 .../beam/sdk/util/PCollectionViewWindow.java    |  67 ---
 .../apache/beam/sdk/util/PCollectionViews.java  | 497 -------------------
 .../beam/sdk/values/PCollectionViews.java       | 495 ++++++++++++++++++
 .../beam/sdk/transforms/DoFnTesterTest.java     |   2 +-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |   2 +-
 17 files changed, 574 insertions(+), 577 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 1c845c6..e1828c3 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -60,9 +60,9 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.View.AsIterable;
 import org.apache.beam.sdk.transforms.View.AsSingleton;
-import org.apache.beam.sdk.util.PCollectionViews;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.hadoop.conf.Configuration;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index cb28c34..6271234 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -60,13 +60,13 @@ import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.PCollectionViews;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;

http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWindow.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWindow.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWindow.java
new file mode 100644
index 0000000..7a7d8ff
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PCollectionViewWindow.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import java.util.Objects;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A pair of a {@link PCollectionView} and a {@link BoundedWindow}, which can
+ * be thought of as window "of" the view. This is a value class for use e.g.
+ * as a compound cache key.
+ *
+ * @param <T> the type of the underlying PCollectionView
+ */
+public final class PCollectionViewWindow<T> {
+
+  private final PCollectionView<T> view;
+  private final BoundedWindow window;
+
+  private PCollectionViewWindow(PCollectionView<T> view, BoundedWindow window) 
{
+    this.view = view;
+    this.window = window;
+  }
+
+  public static <T> PCollectionViewWindow<T> of(PCollectionView<T> view, 
BoundedWindow window) {
+    return new PCollectionViewWindow<>(view, window);
+  }
+
+  public PCollectionView<T> getView() {
+    return view;
+  }
+
+  public BoundedWindow getWindow() {
+    return window;
+  }
+
+  @Override
+  public boolean equals(Object otherObject) {
+    if (!(otherObject instanceof PCollectionViewWindow)) {
+      return false;
+    }
+    @SuppressWarnings("unchecked")
+    PCollectionViewWindow<T> other = (PCollectionViewWindow<T>) otherObject;
+    return getView().equals(other.getView()) && 
getWindow().equals(other.getWindow());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(getView(), getWindow());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
index 380dc65..43da92f 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
@@ -39,7 +39,6 @@ import 
org.apache.beam.runners.core.ReadyCheckingSideInputReader;
 import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.PCollectionViewWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.WindowingStrategy;

http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
index d4ca9fd..5e7c799 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
@@ -45,11 +45,11 @@ import 
org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.util.PCollectionViews;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index 9560e94..d8869b2 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -34,10 +34,10 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.util.PCollectionViews;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
 import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
index a36787a..eda00a7 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
@@ -39,9 +39,9 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.util.PCollectionViews;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.hamcrest.Matchers;
 import org.junit.Rule;

http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 6fffd1a..a2b0c5c 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -54,9 +54,9 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.util.PCollectionViews;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;

http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
index f955f2a..ce1c895 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
@@ -30,10 +30,10 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.util.PCollectionViews;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
 
 /**
  * Flink streaming overrides for various view (side input) transforms.

http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index 1ff8a3f..debaf59 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -74,7 +74,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.PCollectionViews;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
@@ -84,6 +83,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.WindowingStrategy;

http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 1be948f..666db3b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -57,13 +57,13 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.AppliedCombineFn;
 import org.apache.beam.sdk.util.NameUtils;
 import org.apache.beam.sdk.util.NameUtils.NameOverride;
-import org.apache.beam.sdk.util.PCollectionViews;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;

http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index d17d423..d7b8145 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -25,10 +25,10 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.PCollectionViews;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
 
 /**
  * Transforms for creating {@link PCollectionView PCollectionViews} from

http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViewWindow.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViewWindow.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViewWindow.java
deleted file mode 100644
index 410c8ce..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViewWindow.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.util.Objects;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.PCollectionView;
-
-/**
- * A pair of a {@link PCollectionView} and a {@link BoundedWindow}, which can
- * be thought of as window "of" the view. This is a value class for use e.g.
- * as a compound cache key.
- *
- * @param <T> the type of the underlying PCollectionView
- */
-public final class PCollectionViewWindow<T> {
-
-  private final PCollectionView<T> view;
-  private final BoundedWindow window;
-
-  private PCollectionViewWindow(PCollectionView<T> view, BoundedWindow window) 
{
-    this.view = view;
-    this.window = window;
-  }
-
-  public static <T> PCollectionViewWindow<T> of(PCollectionView<T> view, 
BoundedWindow window) {
-    return new PCollectionViewWindow<>(view, window);
-  }
-
-  public PCollectionView<T> getView() {
-    return view;
-  }
-
-  public BoundedWindow getWindow() {
-    return window;
-  }
-
-  @Override
-  public boolean equals(Object otherObject) {
-    if (!(otherObject instanceof PCollectionViewWindow)) {
-      return false;
-    }
-    @SuppressWarnings("unchecked")
-    PCollectionViewWindow<T> other = (PCollectionViewWindow<T>) otherObject;
-    return getView().equals(other.getView()) && 
getWindow().equals(other.getWindow());
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(getView(), getWindow());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java
deleted file mode 100644
index a07bc5e..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java
+++ /dev/null
@@ -1,497 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.common.base.Function;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Objects;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.transforms.Materialization;
-import org.apache.beam.sdk.transforms.Materializations;
-import org.apache.beam.sdk.transforms.ViewFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
-import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValueBase;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-
-/**
- * Implementations of {@link PCollectionView} shared across the SDK.
- *
- * <p>For internal use only, subject to change.
- */
-public class PCollectionViews {
-
-  /**
-   * Returns a {@code PCollectionView<T>} capable of processing elements 
encoded using the provided
-   * {@link Coder} and windowed using the provided * {@link WindowingStrategy}.
-   *
-   * <p>If {@code hasDefault} is {@code true}, then the view will take on the 
value
-   * {@code defaultValue} for any empty windows.
-   */
-  public static <T, W extends BoundedWindow> PCollectionView<T> singletonView(
-      PCollection<T> pCollection,
-      WindowingStrategy<?, W> windowingStrategy,
-      boolean hasDefault,
-      @Nullable T defaultValue,
-      Coder<T> valueCoder) {
-    return new SimplePCollectionView<>(
-        pCollection,
-        new SingletonViewFn<>(hasDefault, defaultValue, valueCoder),
-        windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
-        windowingStrategy,
-        valueCoder);
-  }
-
-  /**
-   * Returns a {@code PCollectionView<Iterable<T>>} capable of processing 
elements encoded using the
-   * provided {@link Coder} and windowed using the provided {@link 
WindowingStrategy}.
-   */
-  public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> 
iterableView(
-      PCollection<T> pCollection,
-      WindowingStrategy<?, W> windowingStrategy,
-      Coder<T> valueCoder) {
-    return new SimplePCollectionView<>(
-        pCollection,
-        new IterableViewFn<T>(),
-        windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
-        windowingStrategy,
-        valueCoder);
-  }
-
-  /**
-   * Returns a {@code PCollectionView<List<T>>} capable of processing elements 
encoded using the
-   * provided {@link Coder} and windowed using the provided {@link 
WindowingStrategy}.
-   */
-  public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
-      PCollection<T> pCollection,
-      WindowingStrategy<?, W> windowingStrategy,
-      Coder<T> valueCoder) {
-    return new SimplePCollectionView<>(
-        pCollection,
-        new ListViewFn<T>(),
-        windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
-        windowingStrategy,
-        valueCoder);
-  }
-
-  /**
-   * Returns a {@code PCollectionView<Map<K, V>>} capable of processing 
elements encoded using the
-   * provided {@link Coder} and windowed using the provided {@link 
WindowingStrategy}.
-   */
-  public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, V>> 
mapView(
-      PCollection<KV<K, V>> pCollection,
-      WindowingStrategy<?, W> windowingStrategy,
-      Coder<KV<K, V>> valueCoder) {
-    return new SimplePCollectionView<>(
-        pCollection,
-        new MapViewFn<K, V>(),
-        windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
-        windowingStrategy,
-        valueCoder);
-  }
-
-  /**
-   * Returns a {@code PCollectionView<Map<K, Iterable<V>>>} capable of 
processing elements encoded
-   * using the provided {@link Coder} and windowed using the provided {@link 
WindowingStrategy}.
-   */
-  public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, 
Iterable<V>>> multimapView(
-      PCollection<KV<K, V>> pCollection,
-      WindowingStrategy<?, W> windowingStrategy,
-      Coder<KV<K, V>> valueCoder) {
-    return new SimplePCollectionView<>(
-        pCollection,
-        new MultimapViewFn<K, V>(),
-        windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
-        windowingStrategy,
-        valueCoder);
-  }
-
-  /**
-   * Implementation of conversion of singleton {@code 
Iterable<WindowedValue<T>>} to {@code T}.
-   *
-   * <p>For internal use only.
-   *
-   * <p>Instantiate via {@link PCollectionViews#singletonView}.
-   *
-   * @deprecated Beam views are migrating off of {@code 
Iterable<WindowedValue<T>>} as a primitive
-   *     view type.
-   */
-  @Deprecated
-  @Experimental(Kind.CORE_RUNNERS_ONLY)
-  public static class SingletonViewFn<T> extends 
ViewFn<Iterable<WindowedValue<T>>, T> {
-    @Nullable private byte[] encodedDefaultValue;
-    @Nullable private transient T defaultValue;
-    @Nullable private Coder<T> valueCoder;
-    private boolean hasDefault;
-
-    private SingletonViewFn(boolean hasDefault, T defaultValue, Coder<T> 
valueCoder) {
-      this.hasDefault = hasDefault;
-      this.defaultValue = defaultValue;
-      this.valueCoder = valueCoder;
-      if (hasDefault) {
-        try {
-          this.encodedDefaultValue = CoderUtils.encodeToByteArray(valueCoder, 
defaultValue);
-        } catch (IOException e) {
-          throw new RuntimeException("Unexpected IOException: ", e);
-        }
-      }
-    }
-
-    /**
-     * Returns the default value that was specified.
-     *
-     * <p>For internal use only.
-     *
-     * @throws NoSuchElementException if no default was specified.
-     */
-    public T getDefaultValue() {
-      if (!hasDefault) {
-        throw new NoSuchElementException("Empty PCollection accessed as a 
singleton view.");
-      }
-      // Lazily decode the default value once
-      synchronized (this) {
-        if (encodedDefaultValue != null && defaultValue == null) {
-          try {
-            defaultValue = CoderUtils.decodeFromByteArray(valueCoder, 
encodedDefaultValue);
-          } catch (IOException e) {
-            throw new RuntimeException("Unexpected IOException: ", e);
-          }
-        }
-        return defaultValue;
-      }
-    }
-
-    @Override
-    public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
-      return Materializations.iterable();
-    }
-
-    @Override
-    public T apply(Iterable<WindowedValue<T>> contents) {
-      try {
-        return Iterables.getOnlyElement(contents).getValue();
-      } catch (NoSuchElementException exc) {
-        return getDefaultValue();
-      } catch (IllegalArgumentException exc) {
-        throw new IllegalArgumentException(
-            "PCollection with more than one element "
-                + "accessed as a singleton view.");
-      }
-    }
-  }
-
-  /**
-   * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code 
Iterable<T>}.
-   *
-   * <p>For internal use only.
-   *
-   * <p>Instantiate via {@link PCollectionViews#iterableView}.
-   *
-   * @deprecated Beam views are migrating off of {@code 
Iterable<WindowedValue<T>>} as a primitive
-   *     view type.
-   */
-  @Deprecated
-  @Experimental(Kind.CORE_RUNNERS_ONLY)
-  public static class IterableViewFn<T>
-      extends ViewFn<Iterable<WindowedValue<T>>, Iterable<T>> {
-    @Override
-    public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
-      return Materializations.iterable();
-    }
-
-    @Override
-    public Iterable<T> apply(Iterable<WindowedValue<T>> contents) {
-      return Iterables.unmodifiableIterable(
-          Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
-        @SuppressWarnings("unchecked")
-        @Override
-        public T apply(WindowedValue<T> input) {
-          return input.getValue();
-        }
-      }));
-    }
-  }
-
-  /**
-   * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code 
List<T>}.
-   *
-   * <p>For internal use only.
-   *
-   * <p>Instantiate via {@link PCollectionViews#listView}.
-   *
-   * @deprecated Beam views are migrating off of {@code 
Iterable<WindowedValue<T>>} as a primitive
-   *     view type.
-   */
-  @Deprecated
-  @Experimental(Kind.CORE_RUNNERS_ONLY)
-  public static class ListViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, 
List<T>> {
-    @Override
-    public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
-      return Materializations.iterable();
-    }
-
-    @Override
-    public List<T> apply(Iterable<WindowedValue<T>> contents) {
-      return ImmutableList.copyOf(
-          Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
-            @SuppressWarnings("unchecked")
-            @Override
-            public T apply(WindowedValue<T> input) {
-              return input.getValue();
-            }
-          }));
-    }
-  }
-
-  /**
-   * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>>}
-   * to {@code Map<K, Iterable<V>>}.
-   *
-   * @deprecated Beam views are migrating off of {@code 
Iterable<WindowedValue<T>>} as a primitive
-   *     view type.
-   */
-  @Deprecated
-  @Experimental(Kind.CORE_RUNNERS_ONLY)
-  public static class MultimapViewFn<K, V>
-      extends ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, Iterable<V>>> {
-    @Override
-    public Materialization<Iterable<WindowedValue<KV<K, V>>>> 
getMaterialization() {
-      return Materializations.iterable();
-    }
-
-    @Override
-    public Map<K, Iterable<V>> apply(Iterable<WindowedValue<KV<K, V>>> 
elements) {
-      Multimap<K, V> multimap = HashMultimap.create();
-      for (WindowedValue<KV<K, V>> elem : elements) {
-        KV<K, V> kv = elem.getValue();
-        multimap.put(kv.getKey(), kv.getValue());
-      }
-      // Safe covariant cast that Java cannot express without rawtypes, even 
with unchecked casts
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      Map<K, Iterable<V>> resultMap = (Map) multimap.asMap();
-      return Collections.unmodifiableMap(resultMap);
-    }
-  }
-
-  /**
-   * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>} 
with one value per key to
-   * {@code Map<K, V>}.
-   *
-   * @deprecated Beam views are migrating off of {@code 
Iterable<WindowedValue<T>>} as a primitive
-   *     view type.
-   */
-  @Deprecated
-  @Experimental(Kind.CORE_RUNNERS_ONLY)
-  public static class MapViewFn<K, V> extends 
ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, V>> {
-    @Override
-    public Materialization<Iterable<WindowedValue<KV<K, V>>>> 
getMaterialization() {
-      return Materializations.iterable();
-    }
-
-    /**
-     * Input iterable must actually be {@code Iterable<WindowedValue<KV<K, 
V>>>}.
-     */
-    @Override
-    public Map<K, V> apply(Iterable<WindowedValue<KV<K, V>>> elements) {
-      Map<K, V> map = new HashMap<>();
-      for (WindowedValue<KV<K, V>> elem : elements) {
-        KV<K, V> kv = elem.getValue();
-        if (map.containsKey(kv.getKey())) {
-          throw new IllegalArgumentException("Duplicate values for " + 
kv.getKey());
-        }
-        map.put(kv.getKey(), kv.getValue());
-      }
-      return Collections.unmodifiableMap(map);
-    }
-  }
-
-  /**
-   * A class for {@link PCollectionView} implementations, with additional type 
parameters
-   * that are not visible at pipeline assembly time when the view is used as a 
side input.
-   *
-   * <p>For internal use only.
-   */
-  public static class SimplePCollectionView<ElemT, ViewT, W extends 
BoundedWindow>
-      extends PValueBase
-      implements PCollectionView<ViewT> {
-    /** The {@link PCollection} this view was originally created from. */
-    private transient PCollection<ElemT> pCollection;
-
-    /** A unique tag for the view, typed according to the elements underlying 
the view. */
-    private TupleTag<Iterable<WindowedValue<ElemT>>> tag;
-
-    private WindowMappingFn<W> windowMappingFn;
-
-    /** The windowing strategy for the PCollection underlying the view. */
-    private WindowingStrategy<?, W> windowingStrategy;
-
-    /** The coder for the elements underlying the view. */
-    private Coder<Iterable<WindowedValue<ElemT>>> coder;
-
-    /**
-     * The typed {@link ViewFn} for this view.
-     */
-    private ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn;
-
-    /**
-     * Call this constructor to initialize the fields for which this base 
class provides
-     * boilerplate accessors.
-     */
-    private SimplePCollectionView(
-        PCollection<ElemT> pCollection,
-        TupleTag<Iterable<WindowedValue<ElemT>>> tag,
-        ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
-        WindowMappingFn<W> windowMappingFn,
-        WindowingStrategy<?, W> windowingStrategy,
-        Coder<ElemT> valueCoder) {
-      super(pCollection.getPipeline());
-      this.pCollection = pCollection;
-      if (windowingStrategy.getWindowFn() instanceof InvalidWindows) {
-        throw new IllegalArgumentException("WindowFn of PCollectionView cannot 
be InvalidWindows");
-      }
-      this.windowMappingFn = windowMappingFn;
-      this.tag = tag;
-      this.windowingStrategy = windowingStrategy;
-      this.viewFn = viewFn;
-      this.coder =
-          IterableCoder.of(WindowedValue.getFullCoder(
-              valueCoder, windowingStrategy.getWindowFn().windowCoder()));
-    }
-
-    /**
-     * Call this constructor to initialize the fields for which this base 
class provides
-     * boilerplate accessors, with an auto-generated tag.
-     */
-    private SimplePCollectionView(
-        PCollection<ElemT> pCollection,
-        ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
-        WindowMappingFn<W> windowMappingFn,
-        WindowingStrategy<?, W> windowingStrategy,
-        Coder<ElemT> valueCoder) {
-      this(
-          pCollection,
-          new TupleTag<Iterable<WindowedValue<ElemT>>>(),
-          viewFn,
-          windowMappingFn,
-          windowingStrategy,
-          valueCoder);
-    }
-
-    /**
-     * For serialization only. Do not use directly.
-     */
-    @SuppressWarnings("unused")  // used for serialization
-    protected SimplePCollectionView() {
-      super();
-    }
-
-    @Override
-    public ViewFn<Iterable<WindowedValue<?>>, ViewT> getViewFn() {
-      // Safe cast: it is required that the rest of the SDK maintain the 
invariant
-      // that a PCollectionView is only provided an iterable for the elements 
of an
-      // appropriately typed PCollection.
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      ViewFn<Iterable<WindowedValue<?>>, ViewT> untypedViewFn = (ViewFn) 
viewFn;
-      return untypedViewFn;
-    }
-
-    @Override
-    public WindowMappingFn<?> getWindowMappingFn() {
-      return windowMappingFn;
-    }
-
-    @Override
-    public PCollection<?> getPCollection() {
-      return pCollection;
-    }
-
-    /**
-     * Returns a unique {@link TupleTag} identifying this {@link 
PCollectionView}.
-     *
-     * <p>For internal use only by runner implementors.
-     */
-    @Override
-    public TupleTag<Iterable<WindowedValue<?>>> getTagInternal() {
-      // Safe cast: It is required that the rest of the SDK maintain the 
invariant that
-      // this tag is only used to access the contents of an appropriately 
typed underlying
-      // PCollection
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      TupleTag<Iterable<WindowedValue<?>>> untypedTag = (TupleTag) tag;
-      return untypedTag;
-    }
-
-    /**
-     * Returns the {@link WindowingStrategy} of this {@link PCollectionView}, 
which should
-     * be that of the underlying {@link PCollection}.
-     *
-     * <p>For internal use only by runner implementors.
-     */
-    @Override
-    public WindowingStrategy<?, ?> getWindowingStrategyInternal() {
-      return windowingStrategy;
-    }
-
-    @Override
-    public Coder<Iterable<WindowedValue<?>>> getCoderInternal() {
-      // Safe cast: It is required that the rest of the SDK only use this 
untyped coder
-      // for the elements of an appropriately typed underlying PCollection.
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      Coder<Iterable<WindowedValue<?>>> untypedCoder = (Coder) coder;
-      return untypedCoder;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(tag);
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (!(other instanceof PCollectionView)) {
-        return false;
-      }
-      @SuppressWarnings("unchecked")
-      PCollectionView<?> otherView = (PCollectionView<?>) other;
-      return tag.equals(otherView.getTagInternal());
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this).add("tag", tag).toString();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
new file mode 100644
index 0000000..74887c7
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import com.google.common.base.Function;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.transforms.Materialization;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * <b>For internal use only; no backwards compatibility guarantees.</b>
+ *
+ * <p>Implementations of {@link PCollectionView} shared across the SDK.
+ */
+@Internal
+public class PCollectionViews {
+
+  /**
+   * Returns a {@code PCollectionView<T>} capable of processing elements 
encoded using the provided
+   * {@link Coder} and windowed using the provided * {@link WindowingStrategy}.
+   *
+   * <p>If {@code hasDefault} is {@code true}, then the view will take on the 
value
+   * {@code defaultValue} for any empty windows.
+   */
+  public static <T, W extends BoundedWindow> PCollectionView<T> singletonView(
+      PCollection<T> pCollection,
+      WindowingStrategy<?, W> windowingStrategy,
+      boolean hasDefault,
+      @Nullable T defaultValue,
+      Coder<T> valueCoder) {
+    return new SimplePCollectionView<>(
+        pCollection,
+        new SingletonViewFn<>(hasDefault, defaultValue, valueCoder),
+        windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+        windowingStrategy,
+        valueCoder);
+  }
+
+  /**
+   * Returns a {@code PCollectionView<Iterable<T>>} capable of processing 
elements encoded using the
+   * provided {@link Coder} and windowed using the provided {@link 
WindowingStrategy}.
+   */
+  public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> 
iterableView(
+      PCollection<T> pCollection,
+      WindowingStrategy<?, W> windowingStrategy,
+      Coder<T> valueCoder) {
+    return new SimplePCollectionView<>(
+        pCollection,
+        new IterableViewFn<T>(),
+        windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+        windowingStrategy,
+        valueCoder);
+  }
+
+  /**
+   * Returns a {@code PCollectionView<List<T>>} capable of processing elements 
encoded using the
+   * provided {@link Coder} and windowed using the provided {@link 
WindowingStrategy}.
+   */
+  public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
+      PCollection<T> pCollection,
+      WindowingStrategy<?, W> windowingStrategy,
+      Coder<T> valueCoder) {
+    return new SimplePCollectionView<>(
+        pCollection,
+        new ListViewFn<T>(),
+        windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+        windowingStrategy,
+        valueCoder);
+  }
+
+  /**
+   * Returns a {@code PCollectionView<Map<K, V>>} capable of processing 
elements encoded using the
+   * provided {@link Coder} and windowed using the provided {@link 
WindowingStrategy}.
+   */
+  public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, V>> 
mapView(
+      PCollection<KV<K, V>> pCollection,
+      WindowingStrategy<?, W> windowingStrategy,
+      Coder<KV<K, V>> valueCoder) {
+    return new SimplePCollectionView<>(
+        pCollection,
+        new MapViewFn<K, V>(),
+        windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+        windowingStrategy,
+        valueCoder);
+  }
+
+  /**
+   * Returns a {@code PCollectionView<Map<K, Iterable<V>>>} capable of 
processing elements encoded
+   * using the provided {@link Coder} and windowed using the provided {@link 
WindowingStrategy}.
+   */
+  public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, 
Iterable<V>>> multimapView(
+      PCollection<KV<K, V>> pCollection,
+      WindowingStrategy<?, W> windowingStrategy,
+      Coder<KV<K, V>> valueCoder) {
+    return new SimplePCollectionView<>(
+        pCollection,
+        new MultimapViewFn<K, V>(),
+        windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+        windowingStrategy,
+        valueCoder);
+  }
+
+  /**
+   * Implementation of conversion of singleton {@code 
Iterable<WindowedValue<T>>} to {@code T}.
+   *
+   * <p>For internal use only.
+   *
+   * <p>Instantiate via {@link PCollectionViews#singletonView}.
+   *
+   * @deprecated Beam views are migrating off of {@code 
Iterable<WindowedValue<T>>} as a primitive
+   *     view type.
+   */
+  @Deprecated
+  @Experimental(Kind.CORE_RUNNERS_ONLY)
+  public static class SingletonViewFn<T> extends 
ViewFn<Iterable<WindowedValue<T>>, T> {
+    @Nullable private byte[] encodedDefaultValue;
+    @Nullable private transient T defaultValue;
+    @Nullable private Coder<T> valueCoder;
+    private boolean hasDefault;
+
+    private SingletonViewFn(boolean hasDefault, T defaultValue, Coder<T> 
valueCoder) {
+      this.hasDefault = hasDefault;
+      this.defaultValue = defaultValue;
+      this.valueCoder = valueCoder;
+      if (hasDefault) {
+        try {
+          this.encodedDefaultValue = CoderUtils.encodeToByteArray(valueCoder, 
defaultValue);
+        } catch (IOException e) {
+          throw new RuntimeException("Unexpected IOException: ", e);
+        }
+      }
+    }
+
+    /**
+     * Returns the default value that was specified.
+     *
+     * <p>For internal use only.
+     *
+     * @throws NoSuchElementException if no default was specified.
+     */
+    public T getDefaultValue() {
+      if (!hasDefault) {
+        throw new NoSuchElementException("Empty PCollection accessed as a 
singleton view.");
+      }
+      // Lazily decode the default value once
+      synchronized (this) {
+        if (encodedDefaultValue != null && defaultValue == null) {
+          try {
+            defaultValue = CoderUtils.decodeFromByteArray(valueCoder, 
encodedDefaultValue);
+          } catch (IOException e) {
+            throw new RuntimeException("Unexpected IOException: ", e);
+          }
+        }
+        return defaultValue;
+      }
+    }
+
+    @Override
+    public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
+      return Materializations.iterable();
+    }
+
+    @Override
+    public T apply(Iterable<WindowedValue<T>> contents) {
+      try {
+        return Iterables.getOnlyElement(contents).getValue();
+      } catch (NoSuchElementException exc) {
+        return getDefaultValue();
+      } catch (IllegalArgumentException exc) {
+        throw new IllegalArgumentException(
+            "PCollection with more than one element "
+                + "accessed as a singleton view.");
+      }
+    }
+  }
+
+  /**
+   * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code 
Iterable<T>}.
+   *
+   * <p>For internal use only.
+   *
+   * <p>Instantiate via {@link PCollectionViews#iterableView}.
+   *
+   * @deprecated Beam views are migrating off of {@code 
Iterable<WindowedValue<T>>} as a primitive
+   *     view type.
+   */
+  @Deprecated
+  @Experimental(Kind.CORE_RUNNERS_ONLY)
+  public static class IterableViewFn<T>
+      extends ViewFn<Iterable<WindowedValue<T>>, Iterable<T>> {
+    @Override
+    public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
+      return Materializations.iterable();
+    }
+
+    @Override
+    public Iterable<T> apply(Iterable<WindowedValue<T>> contents) {
+      return Iterables.unmodifiableIterable(
+          Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
+        @SuppressWarnings("unchecked")
+        @Override
+        public T apply(WindowedValue<T> input) {
+          return input.getValue();
+        }
+      }));
+    }
+  }
+
+  /**
+   * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code 
List<T>}.
+   *
+   * <p>For internal use only.
+   *
+   * <p>Instantiate via {@link PCollectionViews#listView}.
+   *
+   * @deprecated Beam views are migrating off of {@code 
Iterable<WindowedValue<T>>} as a primitive
+   *     view type.
+   */
+  @Deprecated
+  @Experimental(Kind.CORE_RUNNERS_ONLY)
+  public static class ListViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, 
List<T>> {
+    @Override
+    public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
+      return Materializations.iterable();
+    }
+
+    @Override
+    public List<T> apply(Iterable<WindowedValue<T>> contents) {
+      return ImmutableList.copyOf(
+          Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
+            @SuppressWarnings("unchecked")
+            @Override
+            public T apply(WindowedValue<T> input) {
+              return input.getValue();
+            }
+          }));
+    }
+  }
+
+  /**
+   * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>>}
+   * to {@code Map<K, Iterable<V>>}.
+   *
+   * @deprecated Beam views are migrating off of {@code 
Iterable<WindowedValue<T>>} as a primitive
+   *     view type.
+   */
+  @Deprecated
+  @Experimental(Kind.CORE_RUNNERS_ONLY)
+  public static class MultimapViewFn<K, V>
+      extends ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, Iterable<V>>> {
+    @Override
+    public Materialization<Iterable<WindowedValue<KV<K, V>>>> 
getMaterialization() {
+      return Materializations.iterable();
+    }
+
+    @Override
+    public Map<K, Iterable<V>> apply(Iterable<WindowedValue<KV<K, V>>> 
elements) {
+      Multimap<K, V> multimap = HashMultimap.create();
+      for (WindowedValue<KV<K, V>> elem : elements) {
+        KV<K, V> kv = elem.getValue();
+        multimap.put(kv.getKey(), kv.getValue());
+      }
+      // Safe covariant cast that Java cannot express without rawtypes, even 
with unchecked casts
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      Map<K, Iterable<V>> resultMap = (Map) multimap.asMap();
+      return Collections.unmodifiableMap(resultMap);
+    }
+  }
+
+  /**
+   * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>} 
with one value per key to
+   * {@code Map<K, V>}.
+   *
+   * @deprecated Beam views are migrating off of {@code 
Iterable<WindowedValue<T>>} as a primitive
+   *     view type.
+   */
+  @Deprecated
+  @Experimental(Kind.CORE_RUNNERS_ONLY)
+  public static class MapViewFn<K, V> extends 
ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, V>> {
+    @Override
+    public Materialization<Iterable<WindowedValue<KV<K, V>>>> 
getMaterialization() {
+      return Materializations.iterable();
+    }
+
+    /**
+     * Input iterable must actually be {@code Iterable<WindowedValue<KV<K, 
V>>>}.
+     */
+    @Override
+    public Map<K, V> apply(Iterable<WindowedValue<KV<K, V>>> elements) {
+      Map<K, V> map = new HashMap<>();
+      for (WindowedValue<KV<K, V>> elem : elements) {
+        KV<K, V> kv = elem.getValue();
+        if (map.containsKey(kv.getKey())) {
+          throw new IllegalArgumentException("Duplicate values for " + 
kv.getKey());
+        }
+        map.put(kv.getKey(), kv.getValue());
+      }
+      return Collections.unmodifiableMap(map);
+    }
+  }
+
+  /**
+   * A class for {@link PCollectionView} implementations, with additional type 
parameters
+   * that are not visible at pipeline assembly time when the view is used as a 
side input.
+   *
+   * <p>For internal use only.
+   */
+  public static class SimplePCollectionView<ElemT, ViewT, W extends 
BoundedWindow>
+      extends PValueBase
+      implements PCollectionView<ViewT> {
+    /** The {@link PCollection} this view was originally created from. */
+    private transient PCollection<ElemT> pCollection;
+
+    /** A unique tag for the view, typed according to the elements underlying 
the view. */
+    private TupleTag<Iterable<WindowedValue<ElemT>>> tag;
+
+    private WindowMappingFn<W> windowMappingFn;
+
+    /** The windowing strategy for the PCollection underlying the view. */
+    private WindowingStrategy<?, W> windowingStrategy;
+
+    /** The coder for the elements underlying the view. */
+    private Coder<Iterable<WindowedValue<ElemT>>> coder;
+
+    /**
+     * The typed {@link ViewFn} for this view.
+     */
+    private ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn;
+
+    /**
+     * Call this constructor to initialize the fields for which this base 
class provides
+     * boilerplate accessors.
+     */
+    private SimplePCollectionView(
+        PCollection<ElemT> pCollection,
+        TupleTag<Iterable<WindowedValue<ElemT>>> tag,
+        ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
+        WindowMappingFn<W> windowMappingFn,
+        WindowingStrategy<?, W> windowingStrategy,
+        Coder<ElemT> valueCoder) {
+      super(pCollection.getPipeline());
+      this.pCollection = pCollection;
+      if (windowingStrategy.getWindowFn() instanceof InvalidWindows) {
+        throw new IllegalArgumentException("WindowFn of PCollectionView cannot 
be InvalidWindows");
+      }
+      this.windowMappingFn = windowMappingFn;
+      this.tag = tag;
+      this.windowingStrategy = windowingStrategy;
+      this.viewFn = viewFn;
+      this.coder =
+          IterableCoder.of(WindowedValue.getFullCoder(
+              valueCoder, windowingStrategy.getWindowFn().windowCoder()));
+    }
+
+    /**
+     * Call this constructor to initialize the fields for which this base 
class provides
+     * boilerplate accessors, with an auto-generated tag.
+     */
+    private SimplePCollectionView(
+        PCollection<ElemT> pCollection,
+        ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
+        WindowMappingFn<W> windowMappingFn,
+        WindowingStrategy<?, W> windowingStrategy,
+        Coder<ElemT> valueCoder) {
+      this(
+          pCollection,
+          new TupleTag<Iterable<WindowedValue<ElemT>>>(),
+          viewFn,
+          windowMappingFn,
+          windowingStrategy,
+          valueCoder);
+    }
+
+    /**
+     * For serialization only. Do not use directly.
+     */
+    @SuppressWarnings("unused")  // used for serialization
+    protected SimplePCollectionView() {
+      super();
+    }
+
+    @Override
+    public ViewFn<Iterable<WindowedValue<?>>, ViewT> getViewFn() {
+      // Safe cast: it is required that the rest of the SDK maintain the 
invariant
+      // that a PCollectionView is only provided an iterable for the elements 
of an
+      // appropriately typed PCollection.
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      ViewFn<Iterable<WindowedValue<?>>, ViewT> untypedViewFn = (ViewFn) 
viewFn;
+      return untypedViewFn;
+    }
+
+    @Override
+    public WindowMappingFn<?> getWindowMappingFn() {
+      return windowMappingFn;
+    }
+
+    @Override
+    public PCollection<?> getPCollection() {
+      return pCollection;
+    }
+
+    /**
+     * Returns a unique {@link TupleTag} identifying this {@link 
PCollectionView}.
+     *
+     * <p>For internal use only by runner implementors.
+     */
+    @Override
+    public TupleTag<Iterable<WindowedValue<?>>> getTagInternal() {
+      // Safe cast: It is required that the rest of the SDK maintain the 
invariant that
+      // this tag is only used to access the contents of an appropriately 
typed underlying
+      // PCollection
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      TupleTag<Iterable<WindowedValue<?>>> untypedTag = (TupleTag) tag;
+      return untypedTag;
+    }
+
+    /**
+     * Returns the {@link WindowingStrategy} of this {@link PCollectionView}, 
which should
+     * be that of the underlying {@link PCollection}.
+     *
+     * <p>For internal use only by runner implementors.
+     */
+    @Override
+    public WindowingStrategy<?, ?> getWindowingStrategyInternal() {
+      return windowingStrategy;
+    }
+
+    @Override
+    public Coder<Iterable<WindowedValue<?>>> getCoderInternal() {
+      // Safe cast: It is required that the rest of the SDK only use this 
untyped coder
+      // for the elements of an appropriately typed underlying PCollection.
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      Coder<Iterable<WindowedValue<?>>> untypedCoder = (Coder) coder;
+      return untypedCoder;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(tag);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (!(other instanceof PCollectionView)) {
+        return false;
+      }
+      @SuppressWarnings("unchecked")
+      PCollectionView<?> otherView = (PCollectionView<?>) other;
+      return tag.equals(otherView.getTagInternal());
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this).add("tag", tag).toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index f74d673..1bb71bb 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -34,10 +34,10 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.util.PCollectionViews;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.hamcrest.Matchers;

http://git-wip-us.apache.org/repos/asf/beam/blob/c83cc744/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 38da2d9..a3b21ee 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -121,11 +121,11 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.MimeTypes;
-import org.apache.beam.sdk.util.PCollectionViews;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.ValueInSingleWindow;

Reply via email to