Replaces all usages of CountingInput with GenerateSequence

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6a9a24c0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6a9a24c0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6a9a24c0

Branch: refs/heads/master
Commit: 6a9a24c064518bb83a7383babdff9b263dc61346
Parents: 88c6612
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Wed Apr 19 15:32:08 2017 -0700
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Fri Apr 21 16:53:50 2017 -0700

----------------------------------------------------------------------
 .../translation/ReadUnboundTranslatorTest.java  |  6 +-
 .../EmptyFlattenAsCreateFactoryTest.java        | 10 ++--
 .../core/construction/PCollectionsTest.java     |  6 +-
 .../PTransformReplacementsTest.java             |  4 +-
 .../core/construction/PTransformsTest.java      | 17 +++---
 .../core/construction/SdkComponentsTest.java    | 16 +++---
 .../runners/direct/DirectGraphVisitorTest.java  |  6 +-
 .../beam/runners/direct/DirectRunnerTest.java   | 10 ++--
 .../runners/direct/EvaluationContextTest.java   |  4 +-
 .../beam/runners/flink/ReadSourceITCase.java    |  4 +-
 .../flink/ReadSourceStreamingITCase.java        |  4 +-
 .../streaming/StreamingSourceMetricsTest.java   |  7 ++-
 .../org/apache/beam/sdk/io/CountingSource.java  | 43 ++++-----------
 .../apache/beam/sdk/io/GenerateSequence.java    |  5 ++
 .../org/apache/beam/sdk/values/PCollection.java |  9 +--
 .../java/org/apache/beam/sdk/PipelineTest.java  | 58 +++++++++++---------
 .../beam/sdk/io/GenerateSequenceTest.java       | 18 +++---
 .../apache/beam/sdk/metrics/MetricsTest.java    | 12 ++--
 .../sdk/runners/TransformHierarchyTest.java     |  7 +--
 .../beam/sdk/testing/GatherAllPanesTest.java    |  8 +--
 .../apache/beam/sdk/testing/PAssertTest.java    | 12 ++--
 .../apache/beam/sdk/transforms/FlattenTest.java |  8 +--
 .../apache/beam/sdk/transforms/ParDoTest.java   |  4 +-
 .../sdk/transforms/windowing/WindowTest.java    |  4 +-
 .../beam/sdk/values/PCollectionListTest.java    | 25 +++++----
 .../beam/sdk/values/PCollectionTupleTest.java   |  6 +-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  6 +-
 .../sdk/io/gcp/bigtable/BigtableWriteIT.java    |  4 +-
 .../beam/sdk/io/gcp/datastore/V1WriteIT.java    |  4 +-
 29 files changed, 162 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java
index 6d19bb9..e0cc251 100644
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java
@@ -35,7 +35,7 @@ import 
org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInput
 import org.apache.beam.runners.apex.translation.utils.CollectionSource;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -92,12 +92,12 @@ public class ReadUnboundTranslatorTest {
     Pipeline p = Pipeline.create(options);
 
     Set<Long> expected = ContiguousSet.create(Range.closedOpen(0L, 10L), 
DiscreteDomain.longs());
-    p.apply(CountingInput.upTo(10))
+    p.apply(GenerateSequence.fromTo(0, 10))
         .apply(ParDo.of(new EmbeddedCollector()));
 
     ApexRunnerResult result = (ApexRunnerResult) p.run();
     DAG dag = result.getApexDAG();
-    String operatorName = 
"CountingInput.BoundedCountingInput/Read(BoundedCountingSource)";
+    String operatorName = "GenerateSequence/Read(BoundedCountingSource)";
     DAG.OperatorMeta om = dag.getOperatorMeta(operatorName);
     Assert.assertNotNull(om);
     Assert.assertEquals(om.getOperator().getClass(), 
ApexReadUnboundedInputOperator.class);

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java
index ae2d0a9..bfa3190 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java
@@ -23,7 +23,7 @@ import static org.junit.Assert.assertThat;
 
 import java.util.Collections;
 import java.util.Map;
-import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.GenerateSequence;
 import 
org.apache.beam.sdk.runners.PTransformOverrideFactory.PTransformReplacement;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
 import org.apache.beam.sdk.testing.NeedsRunner;
@@ -71,8 +71,8 @@ public class EmptyFlattenAsCreateFactoryTest {
   @Test
   public void getInputNonEmptyThrows() {
     PCollectionList<Long> nonEmpty =
-        PCollectionList.of(pipeline.apply(CountingInput.unbounded()))
-            .and(pipeline.apply(CountingInput.upTo(100L)));
+        PCollectionList.of(pipeline.apply("unbounded", 
GenerateSequence.from(0)))
+            .and(pipeline.apply("bounded", GenerateSequence.fromTo(0, 100)));
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(nonEmpty.expand().toString());
     thrown.expectMessage(EmptyFlattenAsCreateFactory.class.getSimpleName());
@@ -87,8 +87,8 @@ public class EmptyFlattenAsCreateFactoryTest {
 
   @Test
   public void mapOutputsSucceeds() {
-    PCollection<Long> original = pipeline.apply("Original", 
CountingInput.unbounded());
-    PCollection<Long> replacement = pipeline.apply("Replacement", 
CountingInput.unbounded());
+    PCollection<Long> original = pipeline.apply("Original", 
GenerateSequence.from(0));
+    PCollection<Long> replacement = pipeline.apply("Replacement", 
GenerateSequence.from(0));
     Map<PValue, ReplacementOutput> mapping = 
factory.mapOutputs(original.expand(), replacement);
 
     assertThat(

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
index 636d245..48aa1f1 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
@@ -33,7 +33,7 @@ import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -71,7 +71,7 @@ public class PCollectionsTest {
   public static Iterable<PCollection<?>> data() {
     Pipeline pipeline = TestPipeline.create();
     PCollection<Integer> ints = pipeline.apply("ints", Create.of(1, 2, 3));
-    PCollection<Long> longs = pipeline.apply("unbounded longs", 
CountingInput.unbounded());
+    PCollection<Long> longs = pipeline.apply("unbounded longs", 
GenerateSequence.from(0));
     PCollection<Long> windowedLongs =
         longs.apply(
             "into fixed windows",
@@ -83,7 +83,7 @@ public class PCollectionsTest {
             .apply("group", GroupByKey.<String, String>create());
     PCollection<Long> coderLongs =
         pipeline
-            .apply("counts with alternative coder", CountingInput.upTo(10L))
+            .apply("counts with alternative coder", GenerateSequence.fromTo(0, 
10))
             .setCoder(BigEndianLongCoder.of());
     PCollection<Integer> allCustomInts =
         pipeline

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformReplacementsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformReplacementsTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformReplacementsTest.java
index b065617..c6c50f0 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformReplacementsTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformReplacementsTest.java
@@ -23,7 +23,7 @@ import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.ImmutableMap;
 import java.util.Collections;
-import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
@@ -47,7 +47,7 @@ import org.junit.runners.JUnit4;
 public class PTransformReplacementsTest {
   @Rule public TestPipeline pipeline = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
   @Rule public ExpectedException thrown = ExpectedException.none();
-  private PCollection<Long> mainInput = 
pipeline.apply(CountingInput.unbounded());
+  private PCollection<Long> mainInput = 
pipeline.apply(GenerateSequence.from(0));
   private PCollectionView<String> sideInput =
       pipeline.apply(Create.of("foo")).apply(View.<String>asSingleton());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java
index 4e3cdb6..4ef70c0 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java
@@ -35,9 +35,8 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.PTransform;
-import org.apache.beam.sdk.io.CountingInput;
-import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput;
 import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -75,7 +74,7 @@ public class PTransformsTest {
     TestPipeline compositeReadPipeline = TestPipeline.create();
     ToAndFromProtoSpec compositeRead =
         ToAndFromProtoSpec.composite(
-            countingInput(compositeReadPipeline),
+            generateSequence(compositeReadPipeline),
             ToAndFromProtoSpec.leaf(read(compositeReadPipeline)));
     return ImmutableList.<ToAndFromProtoSpec>builder()
         .add(readLeaf)
@@ -152,11 +151,11 @@ public class PTransformsTest {
     @ProcessElement public void process(ProcessContext context) {}
   }
 
-  private static AppliedPTransform<?, ?, ?> countingInput(Pipeline pipeline) {
-    UnboundedCountingInput input = CountingInput.unbounded();
-    PCollection<Long> pcollection = pipeline.apply(input);
-    return AppliedPTransform.<PBegin, PCollection<Long>, 
UnboundedCountingInput>of(
-        "Count", pipeline.begin().expand(), pcollection.expand(), input, 
pipeline);
+  private static AppliedPTransform<?, ?, ?> generateSequence(Pipeline 
pipeline) {
+    GenerateSequence sequence = GenerateSequence.from(0);
+    PCollection<Long> pcollection = pipeline.apply(sequence);
+    return AppliedPTransform.<PBegin, PCollection<Long>, GenerateSequence>of(
+        "Count", pipeline.begin().expand(), pcollection.expand(), sequence, 
pipeline);
   }
 
   private static AppliedPTransform<?, ?, ?> read(Pipeline pipeline) {
@@ -169,7 +168,7 @@ public class PTransformsTest {
   private static AppliedPTransform<?, ?, ?> multiMultiParDo(Pipeline pipeline) 
{
     PCollectionView<String> view =
         pipeline.apply(Create.of("foo")).apply(View.<String>asSingleton());
-    PCollection<Long> input = pipeline.apply(CountingInput.unbounded());
+    PCollection<Long> input = pipeline.apply(GenerateSequence.from(0));
     ParDo.MultiOutput<Long, KV<Long, String>> parDo =
         ParDo.of(new TestDoFn())
             .withSideInputs(view)

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
index 895aec4..021f19c 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.coders.SetCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
-import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
@@ -106,7 +106,7 @@ public class SdkComponentsTest {
   @Test
   public void registerTransformAfterChildren() throws IOException {
     Create.Values<Long> create = Create.of(1L, 2L, 3L);
-    CountingInput.UnboundedCountingInput createChild = 
CountingInput.unbounded();
+    GenerateSequence createChild = GenerateSequence.from(0);
 
     PCollection<Long> pt = pipeline.apply(create);
     String userName = "my_transform";
@@ -115,7 +115,7 @@ public class SdkComponentsTest {
         AppliedPTransform.<PBegin, PCollection<Long>, Create.Values<Long>>of(
             userName, pipeline.begin().expand(), pt.expand(), create, 
pipeline);
     AppliedPTransform<?, ?, ?> childTransform =
-        AppliedPTransform.<PBegin, PCollection<Long>, 
CountingInput.UnboundedCountingInput>of(
+        AppliedPTransform.<PBegin, PCollection<Long>, GenerateSequence>of(
             childUserName, pipeline.begin().expand(), pt.expand(), 
createChild, pipeline);
 
     String childId = components.registerPTransform(childTransform,
@@ -159,7 +159,7 @@ public class SdkComponentsTest {
   @Test
   public void registerTransformWithUnregisteredChildren() throws IOException {
     Create.Values<Long> create = Create.of(1L, 2L, 3L);
-    CountingInput.UnboundedCountingInput createChild = 
CountingInput.unbounded();
+    GenerateSequence createChild = GenerateSequence.from(0);
 
     PCollection<Long> pt = pipeline.apply(create);
     String userName = "my_transform";
@@ -168,7 +168,7 @@ public class SdkComponentsTest {
         AppliedPTransform.<PBegin, PCollection<Long>, Create.Values<Long>>of(
             userName, pipeline.begin().expand(), pt.expand(), create, 
pipeline);
     AppliedPTransform<?, ?, ?> childTransform =
-        AppliedPTransform.<PBegin, PCollection<Long>, 
CountingInput.UnboundedCountingInput>of(
+        AppliedPTransform.<PBegin, PCollection<Long>, GenerateSequence>of(
             childUserName, pipeline.begin().expand(), pt.expand(), 
createChild, pipeline);
 
     thrown.expect(IllegalArgumentException.class);
@@ -179,7 +179,7 @@ public class SdkComponentsTest {
 
   @Test
   public void registerPCollection() throws IOException {
-    PCollection<Long> pCollection = 
pipeline.apply(CountingInput.unbounded()).setName("foo");
+    PCollection<Long> pCollection = 
pipeline.apply(GenerateSequence.from(0)).setName("foo");
     String id = components.registerPCollection(pCollection);
     assertThat(id, equalTo("foo"));
     components.toComponents().getPcollectionsOrThrow(id);
@@ -188,10 +188,10 @@ public class SdkComponentsTest {
   @Test
   public void registerPCollectionExistingNameCollision() throws IOException {
     PCollection<Long> pCollection =
-        pipeline.apply("FirstCount", CountingInput.unbounded()).setName("foo");
+        pipeline.apply("FirstCount", GenerateSequence.from(0)).setName("foo");
     String firstId = components.registerPCollection(pCollection);
     PCollection<Long> duplicate =
-        pipeline.apply("SecondCount", 
CountingInput.unbounded()).setName("foo");
+        pipeline.apply("SecondCount", GenerateSequence.from(0)).setName("foo");
     String secondId = components.registerPCollection(duplicate);
     assertThat(firstId, equalTo("foo"));
     assertThat(secondId, containsString("foo"));

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
index b44c890..2afb6c3 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
@@ -24,11 +24,10 @@ import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.Iterables;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -89,11 +88,10 @@ public class DirectGraphVisitorTest implements Serializable 
{
   public void getRootTransformsContainsRootTransforms() {
     PCollection<String> created = p.apply(Create.of("foo", "bar"));
     PCollection<Long> counted = p.apply(Read.from(CountingSource.upTo(1234L)));
-    PCollection<Long> unCounted = p.apply(CountingInput.unbounded());
+    PCollection<Long> unCounted = p.apply(GenerateSequence.from(0));
     p.traverseTopologically(visitor);
     DirectGraph graph = visitor.getGraph();
     assertThat(graph.getRootTransforms(), hasSize(3));
-    List<PTransform<?, ?>> unapplied = new ArrayList<>();
     assertThat(
         graph.getRootTransforms(),
         Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 246c111..f9063f0 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -52,8 +52,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -246,7 +246,7 @@ public class DirectRunnerTest implements Serializable {
     opts.setRunner(DirectRunner.class);
 
     final Pipeline p = Pipeline.create(opts);
-    p.apply(CountingInput.unbounded().withRate(1L, 
Duration.standardSeconds(1)));
+    p.apply(GenerateSequence.from(0).withRate(1L, 
Duration.standardSeconds(1)));
 
     final BlockingQueue<PipelineResult> resultExchange = new 
ArrayBlockingQueue<>(1);
     Runnable cancelRunnable = new Runnable() {
@@ -513,8 +513,7 @@ public class DirectRunnerTest implements Serializable {
   @Test
   public void testUnencodableOutputFromBoundedRead() throws Exception {
     Pipeline p = getPipeline();
-    PCollection<Long> pCollection =
-        p.apply(CountingInput.upTo(10)).setCoder(new LongNoDecodeCoder());
+    p.apply(GenerateSequence.fromTo(0, 10)).setCoder(new LongNoDecodeCoder());
 
     thrown.expectCause(isA(CoderException.class));
     thrown.expectMessage("Cannot decode a long");
@@ -524,8 +523,7 @@ public class DirectRunnerTest implements Serializable {
   @Test
   public void testUnencodableOutputFromUnboundedRead() {
     Pipeline p = getPipeline();
-    PCollection<Long> pCollection =
-        p.apply(CountingInput.unbounded()).setCoder(new LongNoDecodeCoder());
+    p.apply(GenerateSequence.from(0)).setCoder(new LongNoDecodeCoder());
 
     thrown.expectCause(isA(CoderException.class));
     thrown.expectMessage("Cannot decode a long");

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 7a65493..35b6709 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -44,7 +44,7 @@ import 
org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -103,7 +103,7 @@ public class EvaluationContextTest {
     created = p.apply(Create.of(1, 2, 3));
     downstream = created.apply(WithKeys.<String, Integer>of("foo"));
     view = created.apply(View.<Integer>asIterable());
-    unbounded = p.apply(CountingInput.unbounded());
+    unbounded = p.apply(GenerateSequence.from(0));
 
     KeyedPValueTrackingVisitor keyedPValueTrackingVisitor = 
KeyedPValueTrackingVisitor.create();
     p.traverseTopologically(keyedPValueTrackingVisitor);

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
index 44c9017..8302983 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
@@ -21,7 +21,7 @@ import com.google.common.base.Joiner;
 import java.io.File;
 import java.net.URI;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -68,7 +68,7 @@ public class ReadSourceITCase extends JavaProgramTestBase {
     Pipeline p = FlinkTestPipeline.createForBatch();
 
     PCollection<String> result = p
-        .apply(CountingInput.upTo(10))
+        .apply(GenerateSequence.fromTo(0, 10))
         .apply(ParDo.of(new DoFn<Long, String>() {
           @ProcessElement
           public void processElement(ProcessContext c) throws Exception {

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
index 79b7882..c7a044e 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
@@ -19,7 +19,7 @@ package org.apache.beam.runners.flink;
 
 import com.google.common.base.Joiner;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -58,7 +58,7 @@ public class ReadSourceStreamingITCase extends 
StreamingProgramTestBase {
     Pipeline p = FlinkTestPipeline.createForStreaming();
 
     p
-      .apply(CountingInput.upTo(10))
+      .apply(GenerateSequence.fromTo(0, 10))
       .apply(ParDo.of(new DoFn<Long, String>() {
           @ProcessElement
           public void processElement(ProcessContext c) throws Exception {

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
index ea76d31..85fdf72 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
@@ -27,11 +27,12 @@ import org.apache.beam.runners.spark.PipelineRule;
 import org.apache.beam.runners.spark.TestSparkPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.metrics.MetricNameFilter;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.joda.time.Duration;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -53,7 +54,9 @@ public class StreamingSourceMetricsTest implements 
Serializable {
 
     final long numElements = 1000;
 
-    pipeline.apply(CountingInput.unbounded().withMaxNumRecords(numElements));
+    pipeline.apply(
+        // Use maxReadTime to force unbounded mode.
+        GenerateSequence.fromTo(0, 
numElements).withMaxReadTime(Duration.standardDays(1)));
 
     PipelineResult pipelineResult = pipeline.run();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
index dd018f4..b66a8b2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
@@ -39,44 +38,26 @@ import org.joda.time.Duration;
 import org.joda.time.Instant;
 
 /**
- * A source that produces longs. When used as a {@link BoundedSource}, {@link 
CountingSource}
- * starts at {@code 0} and counts up to a specified maximum. When used as an
- * {@link UnboundedSource}, it counts up to {@link Long#MAX_VALUE} and then 
never produces more
- * output. (In practice, this limit should never be reached.)
+ * Most users should use {@link GenerateSequence} instead.
+ *
+ * <p>A source that produces longs. When used as a {@link BoundedSource}, 
{@link CountingSource}
+ * starts at {@code 0} and counts up to a specified maximum. When used as an 
{@link
+ * UnboundedSource}, it counts up to {@link Long#MAX_VALUE} and then never 
produces more output. (In
+ * practice, this limit should never be reached.)
  *
  * <p>The bounded {@link CountingSource} is implemented based on {@link 
OffsetBasedSource} and
  * {@link OffsetBasedSource.OffsetBasedReader}, so it performs efficient 
initial splitting and it
  * supports dynamic work rebalancing.
  *
- * <p>To produce a bounded {@code PCollection<Long>}, use {@link 
CountingSource#upTo(long)}:
- *
- * <pre>{@code
- * Pipeline p = ...
- * PTransform<PBegin, PCollection<Long>> producer = CountingInput.upTo(1000);
- * PCollection<Long> bounded = p.apply(producer);
- * }</pre>
- *
- * <p>To produce an unbounded {@code PCollection<Long>}, use {@link 
CountingInput#unbounded()},
- * calling {@link 
UnboundedCountingInput#withTimestampFn(SerializableFunction)} to provide values
- * with timestamps other than {@link Instant#now}.
- *
- * <pre>{@code
- * Pipeline p = ...
- *
- * // To create an unbounded PCollection that uses processing time as the 
element timestamp.
- * PCollection<Long> unbounded = p.apply(CountingInput.unbounded());
- * // Or, to create an unbounded source that uses a provided function to set 
the element timestamp.
- * PCollection<Long> unboundedWithTimestamps =
- *     p.apply(CountingInput.unbounded().withTimestampFn(someFn));
- *
- * }</pre>
+ * <p>To produce a bounded source, use {@link #createSourceForSubrange(long, 
long)}. To produce an
+ * unbounded source, use {@link #createUnboundedFrom(long)}.
  */
 public class CountingSource {
   /**
    * Creates a {@link BoundedSource} that will produce the specified number of 
elements,
    * from {@code 0} to {@code numElements - 1}.
    *
-   * @deprecated use {@link CountingInput#upTo(long)} instead
+   * @deprecated use {@link GenerateSequence} instead
    */
   @Deprecated
   public static BoundedSource<Long> upTo(long numElements) {
@@ -117,7 +98,7 @@ public class CountingSource {
    * <p>Elements in the resulting {@link PCollection PCollection&lt;Long&gt;} 
will have timestamps
    * corresponding to processing time at element generation, provided by 
{@link Instant#now}.
    *
-   * @deprecated use {@link CountingInput#unbounded()} instead
+   * @deprecated use {@link GenerateSequence} instead
    */
   @Deprecated
   public static UnboundedSource<Long, CounterMark> unbounded() {
@@ -133,8 +114,8 @@ public class CountingSource {
    *
    * <p>Note that the timestamps produced by {@code timestampFn} may not 
decrease.
    *
-   * @deprecated use {@link CountingInput#unbounded()} and call
-   *             {@link 
UnboundedCountingInput#withTimestampFn(SerializableFunction)} instead
+   * @deprecated use {@link GenerateSequence} and call
+   *             {@link 
GenerateSequence#withTimestampFn(SerializableFunction)} instead
    */
   @Deprecated
   public static UnboundedSource<Long, CounterMark> unboundedWithTimestampFn(

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java
index 189539f..8a83d39 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java
@@ -109,6 +109,11 @@ public abstract class GenerateSequence extends 
PTransform<PBegin, PCollection<Lo
         .build();
   }
 
+  /** Specifies to generate the range [from, to). */
+  public static GenerateSequence fromTo(long from, long to) {
+    return from(from).to(to);
+  }
+
   /** Specifies the maximum number to generate (exclusive). */
   public GenerateSequence to(long to) {
     checkArgument(

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
index b69185b..f89d2aa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.values;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.transforms.Create;
@@ -39,9 +39,10 @@ import org.apache.beam.sdk.util.WindowingStrategy;
  * be passed as the inputs of other PTransforms.
  *
  * <p>Some root transforms produce bounded {@code PCollections} and others
- * produce unbounded ones. For example, {@link CountingInput#upTo} produces a 
fixed set of integers,
- * so it produces a bounded {@link PCollection}. {@link 
CountingInput#unbounded} produces all
- * integers as an infinite stream, so it produces an unbounded {@link 
PCollection}.
+ * produce unbounded ones. For example, {@link GenerateSequence#fromTo} 
produces a fixed set of
+ * integers, so it produces a bounded {@link PCollection}. {@link 
GenerateSequence#from} without
+ * a {@link GenerateSequence#to} produces all integers as an infinite stream, 
so it produces an
+ * unbounded {@link PCollection}.
  *
  * <p>Each element in a {@link PCollection} has an associated timestamp. 
Readers assign timestamps
  * to elements when they create {@link PCollection PCollections}, and other

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index 75cabf2..9604c3f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -36,9 +36,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.io.CountingInput;
-import org.apache.beam.sdk.io.CountingInput.BoundedCountingInput;
-import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -317,8 +315,8 @@ public class PipelineTest {
   @Test
   public void testReplaceAll() {
     pipeline.enableAbandonedNodeEnforcement(false);
-    pipeline.apply(CountingInput.unbounded());
-    pipeline.apply(CountingInput.upTo(100L));
+    pipeline.apply("unbounded", GenerateSequence.from(0));
+    pipeline.apply("bounded", GenerateSequence.fromTo(0, 100));
 
     pipeline.replaceAll(
         ImmutableList.of(
@@ -326,18 +324,18 @@ public class PipelineTest {
                 new PTransformMatcher() {
                   @Override
                   public boolean matches(AppliedPTransform<?, ?, ?> 
application) {
-                    return application.getTransform() instanceof 
UnboundedCountingInput;
+                    return application.getTransform() instanceof 
GenerateSequence;
                   }
                 },
-                new UnboundedCountingInputOverride()),
+                new GenerateSequenceToCreateOverride()),
             PTransformOverride.of(
                 new PTransformMatcher() {
                   @Override
                   public boolean matches(AppliedPTransform<?, ?, ?> 
application) {
-                    return application.getTransform() instanceof 
BoundedCountingInput;
+                    return application.getTransform() instanceof Create.Values;
                   }
                 },
-                new BoundedCountingInputOverride())));
+                new CreateValuesToEmptyFlattenOverride())));
     pipeline.traverseTopologically(
         new PipelineVisitor.Defaults() {
           @Override
@@ -348,9 +346,9 @@ public class PipelineTest {
                   not(
                       anyOf(
                           Matchers.<Class<? extends PTransform>>equalTo(
-                              UnboundedCountingInput.class),
+                              GenerateSequence.class),
                           Matchers.<Class<? extends PTransform>>equalTo(
-                              BoundedCountingInput.class))));
+                              Create.Values.class))));
             }
             return CompositeBehavior.ENTER_TRANSFORM;
           }
@@ -364,7 +362,7 @@ public class PipelineTest {
   @Test
   public void testReplaceAllIncomplete() {
     pipeline.enableAbandonedNodeEnforcement(false);
-    pipeline.apply(CountingInput.unbounded());
+    pipeline.apply(GenerateSequence.from(0));
 
     // The order is such that the output of the second will match the first, 
which is not permitted
     thrown.expect(IllegalStateException.class);
@@ -374,18 +372,18 @@ public class PipelineTest {
                 new PTransformMatcher() {
                   @Override
                   public boolean matches(AppliedPTransform<?, ?, ?> 
application) {
-                    return application.getTransform() instanceof 
BoundedCountingInput;
+                    return application.getTransform() instanceof Create.Values;
                   }
                 },
-                new BoundedCountingInputOverride()),
+                new CreateValuesToEmptyFlattenOverride()),
             PTransformOverride.of(
                 new PTransformMatcher() {
                   @Override
                   public boolean matches(AppliedPTransform<?, ?, ?> 
application) {
-                    return application.getTransform() instanceof 
UnboundedCountingInput;
+                    return application.getTransform() instanceof 
GenerateSequence;
                   }
                 },
-                new UnboundedCountingInputOverride())));
+                new GenerateSequenceToCreateOverride())));
   }
 
   @Test
@@ -455,11 +453,11 @@ public class PipelineTest {
     assertThat(names, not(hasItem("original_application/custom_name2")));
   }
 
-  static class BoundedCountingInputOverride
-      implements PTransformOverrideFactory<PBegin, PCollection<Long>, 
BoundedCountingInput> {
+  static class GenerateSequenceToCreateOverride
+      implements PTransformOverrideFactory<PBegin, PCollection<Long>, 
GenerateSequence> {
     @Override
     public PTransformReplacement<PBegin, PCollection<Long>> 
getReplacementTransform(
-        AppliedPTransform<PBegin, PCollection<Long>, BoundedCountingInput> 
transform) {
+        AppliedPTransform<PBegin, PCollection<Long>, GenerateSequence> 
transform) {
       return PTransformReplacement.of(transform.getPipeline().begin(), 
Create.of(0L));
     }
 
@@ -476,18 +474,28 @@ public class PipelineTest {
               TaggedPValue.of(replacement.getKey(), replacement.getValue())));
     }
   }
-  static class UnboundedCountingInputOverride
-      implements PTransformOverrideFactory<PBegin, PCollection<Long>, 
UnboundedCountingInput> {
 
+  private static class EmptyFlatten<T> extends PTransform<PBegin, 
PCollection<T>> {
     @Override
-    public PTransformReplacement<PBegin, PCollection<Long>> 
getReplacementTransform(
-        AppliedPTransform<PBegin, PCollection<Long>, UnboundedCountingInput> 
transform) {
-      return PTransformReplacement.of(transform.getPipeline().begin(), 
CountingInput.upTo(100L));
+    public PCollection<T> expand(PBegin input) {
+      PCollectionList<T> empty = PCollectionList.empty(input.getPipeline());
+      return empty.apply(Flatten.<T>pCollections());
+    }
+  }
+
+  static class CreateValuesToEmptyFlattenOverride<T>
+      implements PTransformOverrideFactory<PBegin, PCollection<T>, 
Create.Values<T>> {
+
+    @Override
+    public PTransformReplacement<PBegin, PCollection<T>> 
getReplacementTransform(
+        AppliedPTransform<PBegin, PCollection<T>, Create.Values<T>> transform) 
{
+      return PTransformReplacement.of(
+          transform.getPipeline().begin(), new EmptyFlatten<T>());
     }
 
     @Override
     public Map<PValue, ReplacementOutput> mapOutputs(
-        Map<TupleTag<?>, PValue> outputs, PCollection<Long> newOutput) {
+        Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
       Map.Entry<TupleTag<?>, PValue> original = 
Iterables.getOnlyElement(outputs.entrySet());
       Map.Entry<TupleTag<?>, PValue> replacement =
           Iterables.getOnlyElement(newOutput.expand().entrySet());

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/test/java/org/apache/beam/sdk/io/GenerateSequenceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/GenerateSequenceTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/GenerateSequenceTest.java
index 49af479..e2c185b 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/GenerateSequenceTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/GenerateSequenceTest.java
@@ -65,7 +65,7 @@ public class GenerateSequenceTest {
   @Category(ValidatesRunner.class)
   public void testBoundedInput() {
     long numElements = 1000;
-    PCollection<Long> input = 
p.apply(GenerateSequence.from(0).to(numElements));
+    PCollection<Long> input = p.apply(GenerateSequence.fromTo(0, numElements));
 
     addCountingAsserts(input, 0, numElements);
     p.run();
@@ -74,7 +74,7 @@ public class GenerateSequenceTest {
   @Test
   @Category(ValidatesRunner.class)
   public void testEmptyBoundedInput() {
-    PCollection<Long> input = p.apply(GenerateSequence.from(0).to(0));
+    PCollection<Long> input = p.apply(GenerateSequence.fromTo(0, 0));
 
     PAssert.that(input).empty();
     p.run();
@@ -83,7 +83,7 @@ public class GenerateSequenceTest {
   @Test
   @Category(ValidatesRunner.class)
   public void testEmptyBoundedInputSubrange() {
-    PCollection<Long> input = p.apply(GenerateSequence.from(42).to(42));
+    PCollection<Long> input = p.apply(GenerateSequence.fromTo(42, 42));
 
     PAssert.that(input).empty();
     p.run();
@@ -94,7 +94,7 @@ public class GenerateSequenceTest {
   public void testBoundedInputSubrange() {
     long start = 10;
     long end = 1000;
-    PCollection<Long> input = p.apply(GenerateSequence.from(start).to(end));
+    PCollection<Long> input = p.apply(GenerateSequence.fromTo(start, end));
 
     addCountingAsserts(input, start, end);
     p.run();
@@ -102,7 +102,7 @@ public class GenerateSequenceTest {
 
   @Test
   public void testBoundedDisplayData() {
-    PTransform<?, ?> input = GenerateSequence.from(0).to(1234);
+    PTransform<?, ?> input = GenerateSequence.fromTo(0, 1234);
     DisplayData displayData = DisplayData.from(input);
     assertThat(displayData, hasDisplayItem("from", 0));
     assertThat(displayData, hasDisplayItem("to", 1234));
@@ -110,7 +110,7 @@ public class GenerateSequenceTest {
 
   @Test
   public void testBoundedDisplayDataSubrange() {
-    PTransform<?, ?> input = GenerateSequence.from(12).to(1234);
+    PTransform<?, ?> input = GenerateSequence.fromTo(12, 1234);
     DisplayData displayData = DisplayData.from(input);
     assertThat(displayData, hasDisplayItem("from", 12));
     assertThat(displayData, hasDisplayItem("to", 1234));
@@ -124,7 +124,7 @@ public class GenerateSequenceTest {
     long elemsPerPeriod = 10L;
     Duration periodLength = Duration.millis(8);
     PCollection<Long> input =
-        
p.apply(GenerateSequence.from(0).to(numElements).withRate(elemsPerPeriod, 
periodLength));
+        p.apply(GenerateSequence.fromTo(0, 
numElements).withRate(elemsPerPeriod, periodLength));
 
     addCountingAsserts(input, 0, numElements);
     long expectedRuntimeMillis = (periodLength.getMillis() * numElements) / 
elemsPerPeriod;
@@ -147,7 +147,7 @@ public class GenerateSequenceTest {
     long numElements = 1000;
 
     PCollection<Long> input =
-        p.apply(GenerateSequence.from(0).to(numElements).withTimestampFn(new 
ValueAsTimestampFn()));
+        p.apply(GenerateSequence.fromTo(0, numElements).withTimestampFn(new 
ValueAsTimestampFn()));
     addCountingAsserts(input, 0, numElements);
 
     PCollection<Long> diffs =
@@ -172,7 +172,7 @@ public class GenerateSequenceTest {
         };
 
     PTransform<?, ?> input =
-        
GenerateSequence.from(0).to(1234).withMaxReadTime(maxReadTime).withTimestampFn(timestampFn);
+        GenerateSequence.fromTo(0, 
1234).withMaxReadTime(maxReadTime).withTimestampFn(timestampFn);
 
     DisplayData displayData = DisplayData.from(input);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
index afe384d..312a433 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
@@ -29,7 +29,7 @@ import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.UsesAttemptedMetrics;
 import org.apache.beam.sdk.testing.UsesCommittedMetrics;
@@ -37,10 +37,10 @@ import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.hamcrest.CoreMatchers;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.After;
 import org.junit.Rule;
@@ -236,7 +236,7 @@ public class MetricsTest implements Serializable {
   public void testBoundedSourceMetrics() {
     long numElements = 1000;
 
-    PCollection<Long> input = pipeline.apply(CountingInput.upTo(numElements));
+    pipeline.apply(GenerateSequence.fromTo(0, numElements));
 
     PipelineResult pipelineResult = pipeline.run();
 
@@ -257,8 +257,10 @@ public class MetricsTest implements Serializable {
   public void testUnboundedSourceMetrics() {
     long numElements = 1000;
 
-    PCollection<Long> input = pipeline
-        .apply((CountingInput.unbounded()).withMaxNumRecords(numElements));
+
+    // Use withMaxReadTime to force unbounded mode.
+    pipeline.apply(
+        GenerateSequence.fromTo(0, 
numElements).withMaxReadTime(Duration.standardDays(1)));
 
     PipelineResult pipelineResult = pipeline.run();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index 3638fc8..1b884e2 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -32,9 +32,8 @@ import java.util.Map.Entry;
 import java.util.Set;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults;
-import org.apache.beam.sdk.io.CountingInput;
-import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput;
 import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
@@ -254,7 +253,7 @@ public class TransformHierarchyTest implements Serializable 
{
               }
             });
 
-    UnboundedCountingInput genUpstream = CountingInput.unbounded();
+    GenerateSequence genUpstream = GenerateSequence.from(0);
     PCollection<Long> upstream = pipeline.apply(genUpstream);
     PCollection<Long> output = upstream.apply("Original", originalParDo);
     hierarchy.pushNode("Upstream", pipeline.begin(), genUpstream);
@@ -417,7 +416,7 @@ public class TransformHierarchyTest implements Serializable 
{
               }
             });
 
-    UnboundedCountingInput genUpstream = CountingInput.unbounded();
+    GenerateSequence genUpstream = GenerateSequence.from(0);
     PCollection<Long> upstream = pipeline.apply(genUpstream);
     PCollection<Long> output = upstream.apply("Original", originalParDo);
     Node upstreamNode = hierarchy.pushNode("Upstream", pipeline.begin(), 
genUpstream);

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java
index 969bbc4..b546158 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java
@@ -21,7 +21,7 @@ import static org.junit.Assert.fail;
 
 import com.google.common.collect.Iterables;
 import java.io.Serializable;
-import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -54,7 +54,7 @@ public class GatherAllPanesTest implements Serializable {
   @Category(NeedsRunner.class)
   public void singlePaneSingleReifiedPane() {
     PCollection<Iterable<ValueInSingleWindow<Iterable<Long>>>> 
accumulatedPanes =
-        p.apply(CountingInput.upTo(20000))
+        p.apply(GenerateSequence.fromTo(0, 20000))
             .apply(
                 WithTimestamps.of(
                     new SerializableFunction<Long, Instant>() {
@@ -95,8 +95,8 @@ public class GatherAllPanesTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void multiplePanesMultipleReifiedPane() {
-    PCollection<Long> someElems = p.apply("someLongs", 
CountingInput.upTo(20000));
-    PCollection<Long> otherElems = p.apply("otherLongs", 
CountingInput.upTo(20000));
+    PCollection<Long> someElems = p.apply("someLongs", 
GenerateSequence.fromTo(0, 20000));
+    PCollection<Long> otherElems = p.apply("otherLongs", 
GenerateSequence.fromTo(0, 20000));
     PCollection<Iterable<ValueInSingleWindow<Iterable<Long>>>> 
accumulatedPanes =
         PCollectionList.of(someElems)
             .and(otherElems)

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index 3528797..a5362b1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -37,7 +37,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Sum;
@@ -425,7 +425,7 @@ public class PAssertTest implements Serializable {
   @Test
   @Category(ValidatesRunner.class)
   public void testEmptyFalse() throws Exception {
-    PCollection<Long> vals = pipeline.apply(CountingInput.upTo(5L));
+    PCollection<Long> vals = pipeline.apply(GenerateSequence.fromTo(0, 5));
     PAssert.that("Vals should have been empty", vals).empty();
 
     Throwable thrown = runExpectingAssertionFailure(pipeline);
@@ -439,7 +439,7 @@ public class PAssertTest implements Serializable {
   @Test
   @Category(ValidatesRunner.class)
   public void testEmptyFalseDefaultReasonString() throws Exception {
-    PCollection<Long> vals = pipeline.apply(CountingInput.upTo(5L));
+    PCollection<Long> vals = pipeline.apply(GenerateSequence.fromTo(0, 5));
     PAssert.that(vals).empty();
 
     Throwable thrown = runExpectingAssertionFailure(pipeline);
@@ -447,14 +447,14 @@ public class PAssertTest implements Serializable {
     String message = thrown.getMessage();
 
     assertThat(message,
-        
containsString("CountingInput.BoundedCountingInput/Read(BoundedCountingSource).out"));
+        containsString("GenerateSequence/Read(BoundedCountingSource).out"));
     assertThat(message, containsString("Expected: iterable over [] in any 
order"));
   }
 
   @Test
   @Category(ValidatesRunner.class)
   public void testAssertionSiteIsCapturedWithMessage() throws Exception {
-    PCollection<Long> vals = pipeline.apply(CountingInput.upTo(5L));
+    PCollection<Long> vals = pipeline.apply(GenerateSequence.fromTo(0, 5));
     assertThatCollectionIsEmptyWithMessage(vals);
 
     Throwable thrown = runExpectingAssertionFailure(pipeline);
@@ -473,7 +473,7 @@ public class PAssertTest implements Serializable {
   @Test
   @Category(ValidatesRunner.class)
   public void testAssertionSiteIsCapturedWithoutMessage() throws Exception {
-    PCollection<Long> vals = pipeline.apply(CountingInput.upTo(5L));
+    PCollection<Long> vals = pipeline.apply(GenerateSequence.fromTo(0, 5));
     assertThatCollectionIsEmptyWithoutMessage(vals);
 
     Throwable thrown = runExpectingAssertionFailure(pipeline);

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index a4f2545..fca804b 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -45,7 +45,7 @@ import org.apache.beam.sdk.coders.SetCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
@@ -138,9 +138,9 @@ public class FlattenTest implements Serializable {
   @Category(ValidatesRunner.class)
   public void testFlattenInputMultipleCopies() {
     int count = 5;
-    PCollection<Long> longs = p.apply("mkLines", CountingInput.upTo(count));
+    PCollection<Long> longs = p.apply("mkLines", GenerateSequence.fromTo(0, 
count));
     PCollection<Long> biggerLongs =
-        p.apply("mkOtherLines", CountingInput.upTo(count))
+        p.apply("mkOtherLines", GenerateSequence.fromTo(0, count))
             .apply(
                 MapElements.via(
                     new SimpleFunction<Long, Long>() {
@@ -175,7 +175,7 @@ public class FlattenTest implements Serializable {
             Create.of(0L, 1L, 2L, 3L, null, 4L, 5L, null, 6L, 7L, 8L, null, 9L)
                 .withCoder(NullableCoder.of(BigEndianLongCoder.of())));
     PCollection<Long> varLongs =
-        p.apply("VarLengthLongs", 
CountingInput.upTo(5L)).setCoder(VarLongCoder.of());
+        p.apply("VarLengthLongs", GenerateSequence.fromTo(0, 
5)).setCoder(VarLongCoder.of());
 
     PCollection<Long> flattened =
         PCollectionList.of(bigEndianLongs)

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 589c744..3424f86 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -64,7 +64,7 @@ import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.SetCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -889,7 +889,7 @@ public class ParDoTest implements Serializable {
   @Test
   public void testMultiOutputAppliedMultipleTimesDifferentOutputs() {
     pipeline.enableAbandonedNodeEnforcement(false);
-    PCollection<Long> longs = pipeline.apply(CountingInput.unbounded());
+    PCollection<Long> longs = pipeline.apply(GenerateSequence.from(0));
 
     TupleTag<Long> mainOut = new TupleTag<>();
     final TupleTag<String> valueAsString = new TupleTag<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 8bf022b..9fe7ec3 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -39,7 +39,7 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -319,7 +319,7 @@ public class WindowTest implements Serializable {
 
     final PCollection<Long> initialWindows =
         pipeline
-            .apply(CountingInput.upTo(10L))
+            .apply(GenerateSequence.fromTo(0, 10))
             .apply("AssignWindows", Window.into(new WindowOddEvenBuckets()));
 
     // Sanity check the window assignment to demonstrate the baseline

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java
index 76cba01..f687637 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java
@@ -30,10 +30,11 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.hamcrest.Matchers;
+import org.joda.time.Duration;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -61,34 +62,36 @@ public class PCollectionListTest {
   public void testIterationOrder() {
     Pipeline p = TestPipeline.create();
     PCollection<Long> createOne = p.apply("CreateOne", Create.of(1L, 2L, 3L));
-    PCollection<Long> boundedCount = p.apply("CountBounded", 
CountingInput.upTo(23L));
-    PCollection<Long> unboundedCount = p.apply("CountUnbounded", 
CountingInput.unbounded());
+    PCollection<Long> boundedCount = p.apply("CountBounded", 
GenerateSequence.fromTo(0, 23));
+    PCollection<Long> unboundedCount = p.apply("CountUnbounded", 
GenerateSequence.from(0));
     PCollection<Long> createTwo = p.apply("CreateTwo", Create.of(-1L, -2L));
-    PCollection<Long> maxRecordsCount =
-        p.apply("CountLimited", 
CountingInput.unbounded().withMaxNumRecords(22L));
+    PCollection<Long> maxReadTimeCount =
+        p.apply(
+            "CountLimited", 
GenerateSequence.from(0).withMaxReadTime(Duration.standardSeconds(5)));
 
     ImmutableList<PCollection<Long>> counts =
-        ImmutableList.of(boundedCount, maxRecordsCount, unboundedCount);
+        ImmutableList.of(boundedCount, maxReadTimeCount, unboundedCount);
     // Build a PCollectionList from a list. This should have the same order as 
the input list.
     PCollectionList<Long> pcList = PCollectionList.of(counts);
     // Contains is the order-dependent matcher
     assertThat(
         pcList.getAll(),
-        contains(boundedCount, maxRecordsCount, unboundedCount));
+        contains(boundedCount, maxReadTimeCount, unboundedCount));
 
     // A list that is expanded with builder methods has the added value at the 
end
     PCollectionList<Long> withOneCreate = pcList.and(createTwo);
     assertThat(
-        withOneCreate.getAll(), contains(boundedCount, maxRecordsCount, 
unboundedCount, createTwo));
+        withOneCreate.getAll(),
+        contains(boundedCount, maxReadTimeCount, unboundedCount, createTwo));
 
     // Lists that are built entirely from the builder return outputs in the 
order they were added
     PCollectionList<Long> fromEmpty =
         PCollectionList.<Long>empty(p)
             .and(unboundedCount)
             .and(createOne)
-            .and(ImmutableList.of(boundedCount, maxRecordsCount));
+            .and(ImmutableList.of(boundedCount, maxReadTimeCount));
     assertThat(
-        fromEmpty.getAll(), contains(unboundedCount, createOne, boundedCount, 
maxRecordsCount));
+        fromEmpty.getAll(), contains(unboundedCount, createOne, boundedCount, 
maxReadTimeCount));
 
     Map<TupleTag<?>, PValue> expansion = fromEmpty.expand();
     // Tag->PValue mappings are stable between expansions. They don't need to 
be stable across
@@ -96,7 +99,7 @@ public class PCollectionListTest {
     assertThat(expansion, equalTo(fromEmpty.expand()));
 
     List<PCollection<Long>> expectedList =
-        ImmutableList.of(unboundedCount, createOne, boundedCount, 
maxRecordsCount);
+        ImmutableList.of(unboundedCount, createOne, boundedCount, 
maxReadTimeCount);
     assertThat(expansion.values(), containsInAnyOrder(expectedList.toArray()));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
index 9df0512..cc8761b 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
@@ -31,7 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.ValidatesRunner;
@@ -108,7 +108,7 @@ public final class PCollectionTupleTest implements 
Serializable {
   public void testEquals() {
     TestPipeline p = TestPipeline.create();
     TupleTag<Long> longTag = new TupleTag<>();
-    PCollection<Long> longs = p.apply(CountingInput.unbounded());
+    PCollection<Long> longs = p.apply(GenerateSequence.from(0));
     TupleTag<String> strTag = new TupleTag<>();
     PCollection<String> strs = p.apply(Create.of("foo", "bar"));
 
@@ -135,7 +135,7 @@ public final class PCollectionTupleTest implements 
Serializable {
     TupleTag<Long> longTag = new TupleTag<>();
 
     Pipeline p = TestPipeline.create();
-    PCollection<Long> longs = p.apply(CountingInput.upTo(100L));
+    PCollection<Long> longs = p.apply(GenerateSequence.fromTo(0, 100));
     PCollection<String> strs = p.apply(Create.of("foo", "bar", "baz"));
     PCollection<Integer> ints = longs.apply(MapElements.via(new 
SimpleFunction<Long, Integer>() {
       @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 8e1632f..e11dd74 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -75,8 +75,8 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.GenerateSequence;
 import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
@@ -1040,7 +1040,7 @@ public class BigQueryIOTest implements Serializable {
     PCollection<TableRow> tableRows;
     if (unbounded) {
       tableRows =
-          p.apply(CountingInput.unbounded())
+          p.apply(GenerateSequence.from(0))
               .apply(
                   MapElements.via(
                       new SimpleFunction<Long, TableRow>() {
@@ -1090,7 +1090,7 @@ public class BigQueryIOTest implements Serializable {
     tableRef.setTableId("sometable");
 
     PCollection<TableRow> tableRows =
-        p.apply(CountingInput.unbounded())
+        p.apply(GenerateSequence.from(0))
         .apply(
             MapElements.via(
                 new SimpleFunction<Long, TableRow>() {

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
index 240fb31..4ba77c9 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
@@ -43,7 +43,7 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -106,7 +106,7 @@ public class BigtableWriteIT implements Serializable {
     createEmptyTable(instanceName, tableId);
 
     Pipeline p = Pipeline.create(options);
-    p.apply(CountingInput.upTo(numRows))
+    p.apply(GenerateSequence.fromTo(0, numRows))
         .apply(ParDo.of(new DoFn<Long, KV<ByteString, Iterable<Mutation>>>() {
           @ProcessElement
           public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
index 0fcd9d3..322aecd 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
@@ -24,7 +24,7 @@ import static org.junit.Assert.assertEquals;
 
 import java.util.UUID;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.CreateEntityFn;
 import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -66,7 +66,7 @@ public class V1WriteIT {
     Pipeline p = Pipeline.create(options);
 
     // Write to datastore
-    p.apply(CountingInput.upTo(numEntities))
+    p.apply(GenerateSequence.fromTo(0, numEntities))
         .apply(ParDo.of(new CreateEntityFn(
             options.getKind(), options.getNamespace(), ancestor)))
         .apply(DatastoreIO.v1().write().withProjectId(project));

Reply via email to