Repository: incubator-beam
Updated Branches:
  refs/heads/master 9abd0bc8a -> 95ab43809


[BEAM-589] Fixing IO.Read transformation


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/310ea749
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/310ea749
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/310ea749

Branch: refs/heads/master
Commit: 310ea7497a151d1a9567f3e9a3b18e54ddcdc7f0
Parents: 9abd0bc
Author: gaurav gupta <gaugu...@cisco.com>
Authored: Thu Aug 25 14:00:06 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Aug 26 15:14:11 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/examples/complete/TfIdf.java   |  6 +++---
 .../runners/core/UnboundedReadFromBoundedSource.java    |  6 +++---
 .../org/apache/beam/runners/flink/examples/TFIDF.java   |  6 +++---
 .../apache/beam/runners/dataflow/DataflowRunner.java    |  6 +++---
 .../DataflowUnboundedReadFromBoundedSource.java         |  6 +++---
 .../beam/runners/dataflow/DataflowRunnerTest.java       |  4 ++--
 .../org/apache/beam/runners/spark/io/CreateStream.java  |  7 +++----
 .../java/org/apache/beam/runners/spark/io/KafkaIO.java  |  6 +++---
 .../apache/beam/runners/spark/io/hadoop/HadoopIO.java   |  6 +++---
 .../src/main/java/org/apache/beam/sdk/io/AvroIO.java    |  6 +++---
 .../beam/sdk/io/BoundedReadFromUnboundedSource.java     |  6 +++---
 .../src/main/java/org/apache/beam/sdk/io/PubsubIO.java  |  6 +++---
 .../core/src/main/java/org/apache/beam/sdk/io/Read.java | 10 +++++-----
 .../src/main/java/org/apache/beam/sdk/io/TextIO.java    |  6 +++---
 .../java/org/apache/beam/sdk/transforms/Create.java     | 12 ++++++------
 .../test/java/org/apache/beam/sdk/io/AvroIOTest.java    |  2 +-
 .../test/java/org/apache/beam/sdk/io/PubsubIOTest.java  |  2 +-
 .../test/java/org/apache/beam/sdk/io/TextIOTest.java    |  2 +-
 .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java |  8 ++++----
 .../apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java |  4 ++--
 .../src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java |  3 +--
 .../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java |  3 +--
 22 files changed, 60 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java 
b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index 87023ed..6684553 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -51,11 +51,11 @@ import 
org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
 import org.apache.beam.sdk.util.GcsUtil;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.TupleTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -152,7 +152,7 @@ public class TfIdf {
    * from the documents tagged with which document they are from.
    */
   public static class ReadDocuments
-      extends PTransform<PInput, PCollection<KV<URI, String>>> {
+      extends PTransform<PBegin, PCollection<KV<URI, String>>> {
     private Iterable<URI> uris;
 
     public ReadDocuments(Iterable<URI> uris) {
@@ -165,7 +165,7 @@ public class TfIdf {
     }
 
     @Override
-    public PCollection<KV<URI, String>> apply(PInput input) {
+    public PCollection<KV<URI, String>> apply(PBegin input) {
       Pipeline pipeline = input.getPipeline();
 
       // Create one TextIO.Read transform for each document

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
index 73688d4..91a1715 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
@@ -51,8 +51,8 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -74,7 +74,7 @@ import org.slf4j.LoggerFactory;
  * <p>This transform is intended to be used by a runner during pipeline 
translation to convert
  * a Read.Bounded into a Read.Unbounded.
  */
-public class UnboundedReadFromBoundedSource<T> extends PTransform<PInput, 
PCollection<T>> {
+public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, 
PCollection<T>> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(UnboundedReadFromBoundedSource.class);
 
@@ -88,7 +88,7 @@ public class UnboundedReadFromBoundedSource<T> extends 
PTransform<PInput, PColle
   }
 
   @Override
-  public PCollection<T> apply(PInput input) {
+  public PCollection<T> apply(PBegin input) {
     return input.getPipeline().apply(
         Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
 
b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index 0ca94a1..a92d339 100644
--- 
a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ 
b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -53,11 +53,11 @@ import 
org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
 import org.apache.beam.sdk.util.GcsUtil;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.TupleTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -154,7 +154,7 @@ public class TFIDF {
    * from the documents tagged with which document they are from.
    */
   public static class ReadDocuments
-      extends PTransform<PInput, PCollection<KV<URI, String>>> {
+      extends PTransform<PBegin, PCollection<KV<URI, String>>> {
     private static final long serialVersionUID = 0;
 
     private Iterable<URI> uris;
@@ -169,7 +169,7 @@ public class TFIDF {
     }
 
     @Override
-    public PCollection<KV<URI, String>> apply(PInput input) {
+    public PCollection<KV<URI, String>> apply(PBegin input) {
       Pipeline pipeline = input.getPipeline();
 
       // Create one TextIO.Read transform for each document

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index a0e24b1..0ce4b58 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -400,7 +400,7 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
       return windowed;
     } else if 
(Flatten.FlattenPCollectionList.class.equals(transform.getClass())
         && ((PCollectionList<?>) input).size() == 0) {
-      return (OutputT) Pipeline.applyTransform(input, Create.of());
+      return (OutputT) Pipeline.applyTransform((PBegin) input, Create.of());
     } else if (overrides.containsKey(transform.getClass())) {
       // It is the responsibility of whoever constructs overrides to ensure 
this is type safe.
       @SuppressWarnings("unchecked")
@@ -2318,7 +2318,7 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
    * Specialized implementation for {@link org.apache.beam.sdk.io.Read.Bounded 
Read.Bounded} for the
    * Dataflow runner in streaming mode.
    */
-  private static class StreamingBoundedRead<T> extends PTransform<PInput, 
PCollection<T>> {
+  private static class StreamingBoundedRead<T> extends PTransform<PBegin, 
PCollection<T>> {
     private final BoundedSource<T> source;
 
     /** Builds an instance of this class from the overridden transform. */
@@ -2333,7 +2333,7 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public final PCollection<T> apply(PInput input) {
+    public final PCollection<T> apply(PBegin input) {
       source.validate();
 
       return Pipeline.applyTransform(input, new 
DataflowUnboundedReadFromBoundedSource<>(source))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
index 85f5e73..866da13 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
@@ -51,8 +51,8 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -78,7 +78,7 @@ import org.slf4j.LoggerFactory;
  * time dependency. It should be replaced in the dataflow worker as an 
execution time dependency.
  */
 @Deprecated
-public class DataflowUnboundedReadFromBoundedSource<T> extends 
PTransform<PInput, PCollection<T>> {
+public class DataflowUnboundedReadFromBoundedSource<T> extends 
PTransform<PBegin, PCollection<T>> {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(DataflowUnboundedReadFromBoundedSource.class);
@@ -93,7 +93,7 @@ public class DataflowUnboundedReadFromBoundedSource<T> 
extends PTransform<PInput
   }
 
   @Override
-  public PCollection<T> apply(PInput input) {
+  public PCollection<T> apply(PBegin input) {
     return input.getPipeline().apply(
         Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 208e84c..58a01aa 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -101,8 +101,8 @@ import 
org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.hamcrest.Description;
@@ -970,7 +970,7 @@ public class DataflowRunnerTest {
     return options;
   }
 
-  private void testUnsupportedSource(PTransform<PInput, ?> source, String 
name, boolean streaming)
+  private void testUnsupportedSource(PTransform<PBegin, ?> source, String 
name, boolean streaming)
       throws Exception {
     String mode = streaming ? "streaming" : "batch";
     thrown.expect(UnsupportedOperationException.class);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
index b3beae6..a08c54e 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
@@ -21,9 +21,8 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
-
 
 /**
  * Create an input stream from Queue.
@@ -49,7 +48,7 @@ public final class CreateStream<T> {
   /**
    * {@link PTransform} for queueing values.
    */
-  public static final class QueuedValues<T> extends PTransform<PInput, 
PCollection<T>> {
+  public static final class QueuedValues<T> extends PTransform<PBegin, 
PCollection<T>> {
 
     private final Iterable<Iterable<T>> queuedValues;
 
@@ -64,7 +63,7 @@ public final class CreateStream<T> {
     }
 
     @Override
-    public PCollection<T> apply(PInput input) {
+    public PCollection<T> apply(PBegin input) {
       // Spark streaming micro batches are bounded by default
       return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
           WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java
index f57c114..8cf2083 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java
@@ -25,8 +25,8 @@ import kafka.serializer.Decoder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
 
 /**
  * Read stream from Kafka.
@@ -68,7 +68,7 @@ public final class KafkaIO {
     /**
      * A {@link PTransform} reading from Kafka topics and providing {@link 
PCollection}.
      */
-    public static class Unbound<K, V> extends PTransform<PInput, 
PCollection<KV<K, V>>> {
+    public static class Unbound<K, V> extends PTransform<PBegin, 
PCollection<KV<K, V>>> {
 
       private final Class<? extends Decoder<K>> keyDecoderClass;
       private final Class<? extends Decoder<V>> valueDecoderClass;
@@ -120,7 +120,7 @@ public final class KafkaIO {
       }
 
       @Override
-      public PCollection<KV<K, V>> apply(PInput input) {
+      public PCollection<KV<K, V>> apply(PBegin input) {
         // Spark streaming micro batches are bounded by default
         return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
             WindowingStrategy.globalDefault(), 
PCollection.IsBounded.UNBOUNDED);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
index 70bec78..042c316 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
@@ -26,9 +26,9 @@ import org.apache.beam.sdk.io.ShardNameTemplate;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
@@ -58,7 +58,7 @@ public final class HadoopIO {
      * @param <K> the type of the keys
      * @param <V> the type of the values
      */
-    public static class Bound<K, V> extends PTransform<PInput, 
PCollection<KV<K, V>>> {
+    public static class Bound<K, V> extends PTransform<PBegin, 
PCollection<KV<K, V>>> {
 
       private final String filepattern;
       private final Class<? extends FileInputFormat<K, V>> formatClass;
@@ -94,7 +94,7 @@ public final class HadoopIO {
       }
 
       @Override
-      public PCollection<KV<K, V>> apply(PInput input) {
+      public PCollection<KV<K, V>> apply(PBegin input) {
         return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
             WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED);
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index e7c302b..267265d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -40,9 +40,9 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
 
 /**
  * {@link PTransform}s for reading and writing Avro files.
@@ -184,7 +184,7 @@ public class AvroIO {
      * @param <T> the type of each of the elements of the resulting
      * PCollection
      */
-    public static class Bound<T> extends PTransform<PInput, PCollection<T>> {
+    public static class Bound<T> extends PTransform<PBegin, PCollection<T>> {
       /** The filepattern to read from. */
       @Nullable
       final String filepattern;
@@ -270,7 +270,7 @@ public class AvroIO {
       }
 
       @Override
-      public PCollection<T> apply(PInput input) {
+      public PCollection<T> apply(PBegin input) {
         if (filepattern == null) {
           throw new IllegalStateException(
               "need to set the filepattern of an AvroIO.Read transform");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index ede65a9..28d7746 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -36,8 +36,8 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.IntervalBoundedExponentialBackOff;
 import org.apache.beam.sdk.util.ValueWithRecordId;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
@@ -48,7 +48,7 @@ import org.joda.time.Instant;
  *
  * <p>Created by {@link Read}.
  */
-class BoundedReadFromUnboundedSource<T> extends PTransform<PInput, 
PCollection<T>> {
+class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, 
PCollection<T>> {
   private final UnboundedSource<T, ?> source;
   private final long maxNumRecords;
   private final Duration maxReadTime;
@@ -82,7 +82,7 @@ class BoundedReadFromUnboundedSource<T> extends 
PTransform<PInput, PCollection<T
   }
 
   @Override
-  public PCollection<T> apply(PInput input) {
+  public PCollection<T> apply(PBegin input) {
     PCollection<ValueWithRecordId<T>> read = Pipeline.applyTransform(input,
         Read.from(new UnboundedToBoundedSourceAdapter<>(source, maxNumRecords, 
maxReadTime)));
     if (source.requiresDeduping()) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index b137f15..d113457 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -46,9 +46,9 @@ import org.apache.beam.sdk.util.PubsubClient.ProjectPath;
 import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
 import org.apache.beam.sdk.util.PubsubClient.TopicPath;
 import org.apache.beam.sdk.util.PubsubJsonClient;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -481,7 +481,7 @@ public class PubsubIO {
      * A {@link PTransform} that reads from a Cloud Pub/Sub source and returns
      * a unbounded {@link PCollection} containing the items from the stream.
      */
-    public static class Bound<T> extends PTransform<PInput, PCollection<T>> {
+    public static class Bound<T> extends PTransform<PBegin, PCollection<T>> {
       /** The Cloud Pub/Sub topic to read from. */
       @Nullable private final PubsubTopic topic;
 
@@ -610,7 +610,7 @@ public class PubsubIO {
       }
 
       @Override
-      public PCollection<T> apply(PInput input) {
+      public PCollection<T> apply(PBegin input) {
         if (topic == null && subscription == null) {
           throw new IllegalStateException("Need to set either the topic or the 
subscription for "
               + "a PubsubIO.Read transform");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index f99877d..29c4e47 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -25,9 +25,9 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PInput;
 import org.joda.time.Duration;
 
 /**
@@ -87,7 +87,7 @@ public class Read {
   /**
    * {@link PTransform} that reads from a {@link BoundedSource}.
    */
-  public static class Bounded<T> extends PTransform<PInput, PCollection<T>> {
+  public static class Bounded<T> extends PTransform<PBegin, PCollection<T>> {
     private final BoundedSource<T> source;
 
     private Bounded(@Nullable String name, BoundedSource<T> source) {
@@ -101,7 +101,7 @@ public class Read {
     }
 
     @Override
-    public final PCollection<T> apply(PInput input) {
+    public final PCollection<T> apply(PBegin input) {
       source.validate();
 
       return PCollection.<T>createPrimitiveOutputInternal(input.getPipeline(),
@@ -134,7 +134,7 @@ public class Read {
   /**
    * {@link PTransform} that reads from a {@link UnboundedSource}.
    */
-  public static class Unbounded<T> extends PTransform<PInput, PCollection<T>> {
+  public static class Unbounded<T> extends PTransform<PBegin, PCollection<T>> {
     private final UnboundedSource<T, ?> source;
 
     private Unbounded(@Nullable String name, UnboundedSource<T, ?> source) {
@@ -169,7 +169,7 @@ public class Read {
     }
 
     @Override
-    public final PCollection<T> apply(PInput input) {
+    public final PCollection<T> apply(PBegin input) {
       source.validate();
 
       return PCollection.<T>createPrimitiveOutputInternal(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index ed9a627..242470b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -43,9 +43,9 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
 
 /**
  * {@link PTransform}s for reading and writing text files.
@@ -189,7 +189,7 @@ public class TextIO {
      * may use {@link #withCoder(Coder)} to supply a {@code Coder<T>} to 
produce a
      * {@code PCollection<T>} instead.
      */
-    public static class Bound<T> extends PTransform<PInput, PCollection<T>> {
+    public static class Bound<T> extends PTransform<PBegin, PCollection<T>> {
       /** The filepattern to read from. */
       @Nullable private final String filepattern;
 
@@ -269,7 +269,7 @@ public class TextIO {
       }
 
       @Override
-      public PCollection<T> apply(PInput input) {
+      public PCollection<T> apply(PBegin input) {
         if (filepattern == null) {
           throw new IllegalStateException("need to set the filepattern of a 
TextIO.Read transform");
         }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index e261db2..7cd4711 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -46,8 +46,8 @@ import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -218,7 +218,7 @@ public class Create<T> {
   /**
    * A {@code PTransform} that creates a {@code PCollection} from a set of 
in-memory objects.
    */
-  public static class Values<T> extends PTransform<PInput, PCollection<T>> {
+  public static class Values<T> extends PTransform<PBegin, PCollection<T>> {
     /**
      * Returns a {@link Create.Values} PTransform like this one that uses the 
given
      * {@code Coder<T>} to decode each of the objects into a
@@ -240,7 +240,7 @@ public class Create<T> {
     }
 
     @Override
-    public PCollection<T> apply(PInput input) {
+    public PCollection<T> apply(PBegin input) {
       try {
         Coder<T> coder = getDefaultOutputCoder(input);
         try {
@@ -257,7 +257,7 @@ public class Create<T> {
     }
 
     @Override
-    public Coder<T> getDefaultOutputCoder(PInput input) throws 
CannotProvideCoderException {
+    public Coder<T> getDefaultOutputCoder(PBegin input) throws 
CannotProvideCoderException {
       if (coder.isPresent()) {
         return coder.get();
       } else {
@@ -421,7 +421,7 @@ public class Create<T> {
    * A {@code PTransform} that creates a {@code PCollection} whose elements 
have
    * associated timestamps.
    */
-  public static class TimestampedValues<T> extends PTransform<PInput, 
PCollection<T>>{
+  public static class TimestampedValues<T> extends PTransform<PBegin, 
PCollection<T>>{
     /**
      * Returns a {@link Create.TimestampedValues} PTransform like this one 
that uses the given
      * {@code Coder<T>} to decode each of the objects into a
@@ -440,7 +440,7 @@ public class Create<T> {
     }
 
     @Override
-    public PCollection<T> apply(PInput input) {
+    public PCollection<T> apply(PBegin input) {
       try {
         Iterable<T> rawElements =
             Iterables.transform(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index a8a7746..81f05d7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -291,7 +291,7 @@ public class AvroIOTest {
         .withSchema(Schema.create(Schema.Type.STRING))
         .withoutValidation();
 
-    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveTransforms(read);
+    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveSourceTransforms(read);
     assertThat("AvroIO.Read should include the file pattern in its primitive 
transform",
         displayData, hasItem(hasDisplayItem("filePattern")));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
index 4067055..086b726 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
@@ -111,7 +111,7 @@ public class PubsubIOTest {
         
PubsubIO.Read.subscription("projects/project/subscriptions/subscription")
             .maxNumRecords(1);
 
-    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveTransforms(read);
+    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveSourceTransforms(read);
     assertThat("PubsubIO.Read should include the subscription in its primitive 
display data",
         displayData, hasItem(hasDisplayItem("subscription")));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 358a30f..8f94766 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -210,7 +210,7 @@ public class TextIOTest {
         .from("foobar")
         .withoutValidation();
 
-    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveTransforms(read);
+    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveSourceTransforms(read);
     assertThat("TextIO.Read should include the file prefix in its primitive 
display data",
         displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar")))));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 01a8a1c..304dc82 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -122,11 +122,11 @@ import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.Transport;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.joda.time.Instant;
@@ -377,7 +377,7 @@ public class BigQueryIO {
      * A {@link PTransform} that reads from a BigQuery table and returns a 
bounded
      * {@link PCollection} of {@link TableRow TableRows}.
      */
-    public static class Bound extends PTransform<PInput, 
PCollection<TableRow>> {
+    public static class Bound extends PTransform<PBegin, 
PCollection<TableRow>> {
       @Nullable final String jsonTableRef;
       @Nullable final String query;
 
@@ -480,7 +480,7 @@ public class BigQueryIO {
       }
 
       @Override
-      public void validate(PInput input) {
+      public void validate(PBegin input) {
         // Even if existence validation is disabled, we need to make sure that 
the BigQueryIO
         // read is properly specified.
         BigQueryOptions bqOptions = 
input.getPipeline().getOptions().as(BigQueryOptions.class);
@@ -524,7 +524,7 @@ public class BigQueryIO {
       }
 
       @Override
-      public PCollection<TableRow> apply(PInput input) {
+      public PCollection<TableRow> apply(PBegin input) {
         String uuid = randomUUIDString();
         final String jobIdToken = "beam_job_" + uuid;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 7a7575b..57eb4ff 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -671,7 +671,7 @@ public class BigQueryIOTest implements Serializable {
             .withJobService(mockJobService))
         .withoutValidation();
 
-    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveTransforms(read);
+    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveSourceTransforms(read);
     assertThat("BigQueryIO.Read should include the table spec in its primitive 
display data",
         displayData, hasItem(hasDisplayItem("table")));
   }
@@ -688,7 +688,7 @@ public class BigQueryIOTest implements Serializable {
             .withJobService(mockJobService))
         .withoutValidation();
 
-    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveTransforms(read);
+    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveSourceTransforms(read);
     assertThat("BigQueryIO.Read should include the query in its primitive 
display data",
         displayData, hasItem(hasDisplayItem("query")));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java 
b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index f92dbd4..29d0c5f 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -51,7 +51,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -140,7 +139,7 @@ public class JmsIO {
       // handles unbounded source to bounded conversion if maxNumRecords is 
set.
       Unbounded<JmsRecord> unbounded = 
org.apache.beam.sdk.io.Read.from(createSource());
 
-      PTransform<PInput, PCollection<JmsRecord>> transform = unbounded;
+      PTransform<PBegin, PCollection<JmsRecord>> transform = unbounded;
 
       if (maxNumRecords != Long.MAX_VALUE) {
         transform = unbounded.withMaxNumRecords(maxNumRecords);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 885d5d1..f639422 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -73,7 +73,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -450,7 +449,7 @@ public class KafkaIO {
       Unbounded<KafkaRecord<K, V>> unbounded =
           org.apache.beam.sdk.io.Read.from(makeSource());
 
-      PTransform<PInput, PCollection<KafkaRecord<K, V>>> transform = unbounded;
+      PTransform<PBegin, PCollection<KafkaRecord<K, V>>> transform = unbounded;
 
       if (maxNumRecords < Long.MAX_VALUE) {
         transform = unbounded.withMaxNumRecords(maxNumRecords);


Reply via email to