Repository: incubator-beam Updated Branches: refs/heads/master 9abd0bc8a -> 95ab43809
[BEAM-589] Fixing IO.Read transformation Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/310ea749 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/310ea749 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/310ea749 Branch: refs/heads/master Commit: 310ea7497a151d1a9567f3e9a3b18e54ddcdc7f0 Parents: 9abd0bc Author: gaurav gupta <gaugu...@cisco.com> Authored: Thu Aug 25 14:00:06 2016 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Fri Aug 26 15:14:11 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/examples/complete/TfIdf.java | 6 +++--- .../runners/core/UnboundedReadFromBoundedSource.java | 6 +++--- .../org/apache/beam/runners/flink/examples/TFIDF.java | 6 +++--- .../apache/beam/runners/dataflow/DataflowRunner.java | 6 +++--- .../DataflowUnboundedReadFromBoundedSource.java | 6 +++--- .../beam/runners/dataflow/DataflowRunnerTest.java | 4 ++-- .../org/apache/beam/runners/spark/io/CreateStream.java | 7 +++---- .../java/org/apache/beam/runners/spark/io/KafkaIO.java | 6 +++--- .../apache/beam/runners/spark/io/hadoop/HadoopIO.java | 6 +++--- .../src/main/java/org/apache/beam/sdk/io/AvroIO.java | 6 +++--- .../beam/sdk/io/BoundedReadFromUnboundedSource.java | 6 +++--- .../src/main/java/org/apache/beam/sdk/io/PubsubIO.java | 6 +++--- .../core/src/main/java/org/apache/beam/sdk/io/Read.java | 10 +++++----- .../src/main/java/org/apache/beam/sdk/io/TextIO.java | 6 +++--- .../java/org/apache/beam/sdk/transforms/Create.java | 12 ++++++------ .../test/java/org/apache/beam/sdk/io/AvroIOTest.java | 2 +- .../test/java/org/apache/beam/sdk/io/PubsubIOTest.java | 2 +- .../test/java/org/apache/beam/sdk/io/TextIOTest.java | 2 +- .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 8 ++++---- .../apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java | 4 ++-- .../src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java | 3 +-- .../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 3 +-- 22 files changed, 60 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java index 87023ed..6684553 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java @@ -51,11 +51,11 @@ import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.TupleTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -152,7 +152,7 @@ public class TfIdf { * from the documents tagged with which document they are from. */ public static class ReadDocuments - extends PTransform<PInput, PCollection<KV<URI, String>>> { + extends PTransform<PBegin, PCollection<KV<URI, String>>> { private Iterable<URI> uris; public ReadDocuments(Iterable<URI> uris) { @@ -165,7 +165,7 @@ public class TfIdf { } @Override - public PCollection<KV<URI, String>> apply(PInput input) { + public PCollection<KV<URI, String>> apply(PBegin input) { Pipeline pipeline = input.getPipeline(); // Create one TextIO.Read transform for each document http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java index 73688d4..91a1715 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java @@ -51,8 +51,8 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.PropertyNames; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Instant; import org.slf4j.Logger; @@ -74,7 +74,7 @@ import org.slf4j.LoggerFactory; * <p>This transform is intended to be used by a runner during pipeline translation to convert * a Read.Bounded into a Read.Unbounded. */ -public class UnboundedReadFromBoundedSource<T> extends PTransform<PInput, PCollection<T>> { +public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PCollection<T>> { private static final Logger LOG = LoggerFactory.getLogger(UnboundedReadFromBoundedSource.class); @@ -88,7 +88,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PInput, PColle } @Override - public PCollection<T> apply(PInput input) { + public PCollection<T> apply(PBegin input) { return input.getPipeline().apply( Read.from(new BoundedToUnboundedSourceAdapter<>(source))); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java index 0ca94a1..a92d339 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java @@ -53,11 +53,11 @@ import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.TupleTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -154,7 +154,7 @@ public class TFIDF { * from the documents tagged with which document they are from. */ public static class ReadDocuments - extends PTransform<PInput, PCollection<KV<URI, String>>> { + extends PTransform<PBegin, PCollection<KV<URI, String>>> { private static final long serialVersionUID = 0; private Iterable<URI> uris; @@ -169,7 +169,7 @@ public class TFIDF { } @Override - public PCollection<KV<URI, String>> apply(PInput input) { + public PCollection<KV<URI, String>> apply(PBegin input) { Pipeline pipeline = input.getPipeline(); // Create one TextIO.Read transform for each document http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index a0e24b1..0ce4b58 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -400,7 +400,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { return windowed; } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass()) && ((PCollectionList<?>) input).size() == 0) { - return (OutputT) Pipeline.applyTransform(input, Create.of()); + return (OutputT) Pipeline.applyTransform((PBegin) input, Create.of()); } else if (overrides.containsKey(transform.getClass())) { // It is the responsibility of whoever constructs overrides to ensure this is type safe. @SuppressWarnings("unchecked") @@ -2318,7 +2318,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { * Specialized implementation for {@link org.apache.beam.sdk.io.Read.Bounded Read.Bounded} for the * Dataflow runner in streaming mode. */ - private static class StreamingBoundedRead<T> extends PTransform<PInput, PCollection<T>> { + private static class StreamingBoundedRead<T> extends PTransform<PBegin, PCollection<T>> { private final BoundedSource<T> source; /** Builds an instance of this class from the overridden transform. */ @@ -2333,7 +2333,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } @Override - public final PCollection<T> apply(PInput input) { + public final PCollection<T> apply(PBegin input) { source.validate(); return Pipeline.applyTransform(input, new DataflowUnboundedReadFromBoundedSource<>(source)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java index 85f5e73..866da13 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java @@ -51,8 +51,8 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.PropertyNames; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Instant; import org.slf4j.Logger; @@ -78,7 +78,7 @@ import org.slf4j.LoggerFactory; * time dependency. It should be replaced in the dataflow worker as an execution time dependency. */ @Deprecated -public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PInput, PCollection<T>> { +public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PCollection<T>> { private static final Logger LOG = LoggerFactory.getLogger(DataflowUnboundedReadFromBoundedSource.class); @@ -93,7 +93,7 @@ public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PInput } @Override - public PCollection<T> apply(PInput input) { + public PCollection<T> apply(PBegin input) { return input.getPipeline().apply( Read.from(new BoundedToUnboundedSourceAdapter<>(source))); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 208e84c..58a01aa 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -101,8 +101,8 @@ import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; import org.hamcrest.Description; @@ -970,7 +970,7 @@ public class DataflowRunnerTest { return options; } - private void testUnsupportedSource(PTransform<PInput, ?> source, String name, boolean streaming) + private void testUnsupportedSource(PTransform<PBegin, ?> source, String name, boolean streaming) throws Exception { String mode = streaming ? "streaming" : "batch"; thrown.expect(UnsupportedOperationException.class); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java index b3beae6..a08c54e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java @@ -21,9 +21,8 @@ import static com.google.common.base.Preconditions.checkNotNull; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PInput; - /** * Create an input stream from Queue. @@ -49,7 +48,7 @@ public final class CreateStream<T> { /** * {@link PTransform} for queueing values. */ - public static final class QueuedValues<T> extends PTransform<PInput, PCollection<T>> { + public static final class QueuedValues<T> extends PTransform<PBegin, PCollection<T>> { private final Iterable<Iterable<T>> queuedValues; @@ -64,7 +63,7 @@ public final class CreateStream<T> { } @Override - public PCollection<T> apply(PInput input) { + public PCollection<T> apply(PBegin input) { // Spark streaming micro batches are bounded by default return PCollection.createPrimitiveOutputInternal(input.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java index f57c114..8cf2083 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java @@ -25,8 +25,8 @@ import kafka.serializer.Decoder; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PInput; /** * Read stream from Kafka. @@ -68,7 +68,7 @@ public final class KafkaIO { /** * A {@link PTransform} reading from Kafka topics and providing {@link PCollection}. */ - public static class Unbound<K, V> extends PTransform<PInput, PCollection<KV<K, V>>> { + public static class Unbound<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> { private final Class<? extends Decoder<K>> keyDecoderClass; private final Class<? extends Decoder<V>> valueDecoderClass; @@ -120,7 +120,7 @@ public final class KafkaIO { } @Override - public PCollection<KV<K, V>> apply(PInput input) { + public PCollection<KV<K, V>> apply(PBegin input) { // Spark streaming micro batches are bounded by default return PCollection.createPrimitiveOutputInternal(input.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java index 70bec78..042c316 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java @@ -26,9 +26,9 @@ import org.apache.beam.sdk.io.ShardNameTemplate; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; @@ -58,7 +58,7 @@ public final class HadoopIO { * @param <K> the type of the keys * @param <V> the type of the values */ - public static class Bound<K, V> extends PTransform<PInput, PCollection<KV<K, V>>> { + public static class Bound<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> { private final String filepattern; private final Class<? extends FileInputFormat<K, V>> formatClass; @@ -94,7 +94,7 @@ public final class HadoopIO { } @Override - public PCollection<KV<K, V>> apply(PInput input) { + public PCollection<KV<K, V>> apply(PBegin input) { return PCollection.createPrimitiveOutputInternal(input.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index e7c302b..267265d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -40,9 +40,9 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; /** * {@link PTransform}s for reading and writing Avro files. @@ -184,7 +184,7 @@ public class AvroIO { * @param <T> the type of each of the elements of the resulting * PCollection */ - public static class Bound<T> extends PTransform<PInput, PCollection<T>> { + public static class Bound<T> extends PTransform<PBegin, PCollection<T>> { /** The filepattern to read from. */ @Nullable final String filepattern; @@ -270,7 +270,7 @@ public class AvroIO { } @Override - public PCollection<T> apply(PInput input) { + public PCollection<T> apply(PBegin input) { if (filepattern == null) { throw new IllegalStateException( "need to set the filepattern of an AvroIO.Read transform"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java index ede65a9..28d7746 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java @@ -36,8 +36,8 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.IntervalBoundedExponentialBackOff; import org.apache.beam.sdk.util.ValueWithRecordId; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PInput; import org.joda.time.Duration; import org.joda.time.Instant; @@ -48,7 +48,7 @@ import org.joda.time.Instant; * * <p>Created by {@link Read}. */ -class BoundedReadFromUnboundedSource<T> extends PTransform<PInput, PCollection<T>> { +class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> { private final UnboundedSource<T, ?> source; private final long maxNumRecords; private final Duration maxReadTime; @@ -82,7 +82,7 @@ class BoundedReadFromUnboundedSource<T> extends PTransform<PInput, PCollection<T } @Override - public PCollection<T> apply(PInput input) { + public PCollection<T> apply(PBegin input) { PCollection<ValueWithRecordId<T>> read = Pipeline.applyTransform(input, Read.from(new UnboundedToBoundedSourceAdapter<>(source, maxNumRecords, maxReadTime))); if (source.requiresDeduping()) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java index b137f15..d113457 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java @@ -46,9 +46,9 @@ import org.apache.beam.sdk.util.PubsubClient.ProjectPath; import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.util.PubsubClient.TopicPath; import org.apache.beam.sdk.util.PubsubJsonClient; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; @@ -481,7 +481,7 @@ public class PubsubIO { * A {@link PTransform} that reads from a Cloud Pub/Sub source and returns * a unbounded {@link PCollection} containing the items from the stream. */ - public static class Bound<T> extends PTransform<PInput, PCollection<T>> { + public static class Bound<T> extends PTransform<PBegin, PCollection<T>> { /** The Cloud Pub/Sub topic to read from. */ @Nullable private final PubsubTopic topic; @@ -610,7 +610,7 @@ public class PubsubIO { } @Override - public PCollection<T> apply(PInput input) { + public PCollection<T> apply(PBegin input) { if (topic == null && subscription == null) { throw new IllegalStateException("Need to set either the topic or the subscription for " + "a PubsubIO.Read transform"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java index f99877d..29c4e47 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java @@ -25,9 +25,9 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; -import org.apache.beam.sdk.values.PInput; import org.joda.time.Duration; /** @@ -87,7 +87,7 @@ public class Read { /** * {@link PTransform} that reads from a {@link BoundedSource}. */ - public static class Bounded<T> extends PTransform<PInput, PCollection<T>> { + public static class Bounded<T> extends PTransform<PBegin, PCollection<T>> { private final BoundedSource<T> source; private Bounded(@Nullable String name, BoundedSource<T> source) { @@ -101,7 +101,7 @@ public class Read { } @Override - public final PCollection<T> apply(PInput input) { + public final PCollection<T> apply(PBegin input) { source.validate(); return PCollection.<T>createPrimitiveOutputInternal(input.getPipeline(), @@ -134,7 +134,7 @@ public class Read { /** * {@link PTransform} that reads from a {@link UnboundedSource}. */ - public static class Unbounded<T> extends PTransform<PInput, PCollection<T>> { + public static class Unbounded<T> extends PTransform<PBegin, PCollection<T>> { private final UnboundedSource<T, ?> source; private Unbounded(@Nullable String name, UnboundedSource<T, ?> source) { @@ -169,7 +169,7 @@ public class Read { } @Override - public final PCollection<T> apply(PInput input) { + public final PCollection<T> apply(PBegin input) { source.validate(); return PCollection.<T>createPrimitiveOutputInternal( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index ed9a627..242470b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -43,9 +43,9 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; /** * {@link PTransform}s for reading and writing text files. @@ -189,7 +189,7 @@ public class TextIO { * may use {@link #withCoder(Coder)} to supply a {@code Coder<T>} to produce a * {@code PCollection<T>} instead. */ - public static class Bound<T> extends PTransform<PInput, PCollection<T>> { + public static class Bound<T> extends PTransform<PBegin, PCollection<T>> { /** The filepattern to read from. */ @Nullable private final String filepattern; @@ -269,7 +269,7 @@ public class TextIO { } @Override - public PCollection<T> apply(PInput input) { + public PCollection<T> apply(PBegin input) { if (filepattern == null) { throw new IllegalStateException("need to set the filepattern of a TextIO.Read transform"); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java index e261db2..7cd4711 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java @@ -46,8 +46,8 @@ import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; import org.apache.beam.sdk.values.TypeDescriptor; @@ -218,7 +218,7 @@ public class Create<T> { /** * A {@code PTransform} that creates a {@code PCollection} from a set of in-memory objects. */ - public static class Values<T> extends PTransform<PInput, PCollection<T>> { + public static class Values<T> extends PTransform<PBegin, PCollection<T>> { /** * Returns a {@link Create.Values} PTransform like this one that uses the given * {@code Coder<T>} to decode each of the objects into a @@ -240,7 +240,7 @@ public class Create<T> { } @Override - public PCollection<T> apply(PInput input) { + public PCollection<T> apply(PBegin input) { try { Coder<T> coder = getDefaultOutputCoder(input); try { @@ -257,7 +257,7 @@ public class Create<T> { } @Override - public Coder<T> getDefaultOutputCoder(PInput input) throws CannotProvideCoderException { + public Coder<T> getDefaultOutputCoder(PBegin input) throws CannotProvideCoderException { if (coder.isPresent()) { return coder.get(); } else { @@ -421,7 +421,7 @@ public class Create<T> { * A {@code PTransform} that creates a {@code PCollection} whose elements have * associated timestamps. */ - public static class TimestampedValues<T> extends PTransform<PInput, PCollection<T>>{ + public static class TimestampedValues<T> extends PTransform<PBegin, PCollection<T>>{ /** * Returns a {@link Create.TimestampedValues} PTransform like this one that uses the given * {@code Coder<T>} to decode each of the objects into a @@ -440,7 +440,7 @@ public class Create<T> { } @Override - public PCollection<T> apply(PInput input) { + public PCollection<T> apply(PBegin input) { try { Iterable<T> rawElements = Iterables.transform( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index a8a7746..81f05d7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -291,7 +291,7 @@ public class AvroIOTest { .withSchema(Schema.create(Schema.Type.STRING)) .withoutValidation(); - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read); + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); assertThat("AvroIO.Read should include the file pattern in its primitive transform", displayData, hasItem(hasDisplayItem("filePattern"))); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java index 4067055..086b726 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java @@ -111,7 +111,7 @@ public class PubsubIOTest { PubsubIO.Read.subscription("projects/project/subscriptions/subscription") .maxNumRecords(1); - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read); + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); assertThat("PubsubIO.Read should include the subscription in its primitive display data", displayData, hasItem(hasDisplayItem("subscription"))); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 358a30f..8f94766 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -210,7 +210,7 @@ public class TextIOTest { .from("foobar") .withoutValidation(); - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read); + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); assertThat("TextIO.Read should include the file prefix in its primitive display data", displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar"))))); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 01a8a1c..304dc82 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -122,11 +122,11 @@ import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.Transport; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.joda.time.Instant; @@ -377,7 +377,7 @@ public class BigQueryIO { * A {@link PTransform} that reads from a BigQuery table and returns a bounded * {@link PCollection} of {@link TableRow TableRows}. */ - public static class Bound extends PTransform<PInput, PCollection<TableRow>> { + public static class Bound extends PTransform<PBegin, PCollection<TableRow>> { @Nullable final String jsonTableRef; @Nullable final String query; @@ -480,7 +480,7 @@ public class BigQueryIO { } @Override - public void validate(PInput input) { + public void validate(PBegin input) { // Even if existence validation is disabled, we need to make sure that the BigQueryIO // read is properly specified. BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); @@ -524,7 +524,7 @@ public class BigQueryIO { } @Override - public PCollection<TableRow> apply(PInput input) { + public PCollection<TableRow> apply(PBegin input) { String uuid = randomUUIDString(); final String jobIdToken = "beam_job_" + uuid; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 7a7575b..57eb4ff 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -671,7 +671,7 @@ public class BigQueryIOTest implements Serializable { .withJobService(mockJobService)) .withoutValidation(); - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read); + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); assertThat("BigQueryIO.Read should include the table spec in its primitive display data", displayData, hasItem(hasDisplayItem("table"))); } @@ -688,7 +688,7 @@ public class BigQueryIOTest implements Serializable { .withJobService(mockJobService)) .withoutValidation(); - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read); + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); assertThat("BigQueryIO.Read should include the query in its primitive display data", displayData, hasItem(hasDisplayItem("query"))); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index f92dbd4..29d0c5f 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -51,7 +51,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; @@ -140,7 +139,7 @@ public class JmsIO { // handles unbounded source to bounded conversion if maxNumRecords is set. Unbounded<JmsRecord> unbounded = org.apache.beam.sdk.io.Read.from(createSource()); - PTransform<PInput, PCollection<JmsRecord>> transform = unbounded; + PTransform<PBegin, PCollection<JmsRecord>> transform = unbounded; if (maxNumRecords != Long.MAX_VALUE) { transform = unbounded.withMaxNumRecords(maxNumRecords); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/310ea749/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 885d5d1..f639422 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -73,7 +73,6 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -450,7 +449,7 @@ public class KafkaIO { Unbounded<KafkaRecord<K, V>> unbounded = org.apache.beam.sdk.io.Read.from(makeSource()); - PTransform<PInput, PCollection<KafkaRecord<K, V>>> transform = unbounded; + PTransform<PBegin, PCollection<KafkaRecord<K, V>>> transform = unbounded; if (maxNumRecords < Long.MAX_VALUE) { transform = unbounded.withMaxNumRecords(maxNumRecords);