Repository: beam
Updated Branches:
  refs/heads/master 62f041e56 -> 781c15522


Introduces GenerateSequence transform

It is a replacement for CountingInput, which will be deprecated.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/88c66129
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/88c66129
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/88c66129

Branch: refs/heads/master
Commit: 88c66129ba0cff9c8319f21ad317597d9bd8b5cd
Parents: 62f041e
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Tue Apr 18 16:48:38 2017 -0700
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Fri Apr 21 16:53:49 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/CountingInput.java   |   2 +-
 .../org/apache/beam/sdk/io/CountingSource.java  |   4 +-
 .../apache/beam/sdk/io/GenerateSequence.java    | 194 +++++++++++++++++++
 .../apache/beam/sdk/io/CountingSourceTest.java  |   4 +-
 .../beam/sdk/io/GenerateSequenceTest.java       | 194 +++++++++++++++++++
 5 files changed, 393 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/88c66129/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
index 72ebd97..ab006d4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
@@ -247,7 +247,7 @@ public class CountingInput {
     public PCollection<Long> expand(PBegin begin) {
       Unbounded<Long> read =
           Read.from(
-              CountingSource.createUnbounded()
+              CountingSource.createUnboundedFrom(0)
                   .withTimestampFn(timestampFn)
                   .withRate(elementsPerPeriod, period));
       if (!maxNumRecords.isPresent() && !maxReadTime.isPresent()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/88c66129/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
index 73b663d..dd018f4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
@@ -103,8 +103,8 @@ public class CountingSource {
    * Create a new {@link UnboundedCountingSource}.
    */
   // package-private to return a typed UnboundedCountingSource rather than the 
UnboundedSource type.
-  static UnboundedCountingSource createUnbounded() {
-    return new UnboundedCountingSource(0, 1, 1L, Duration.ZERO, new 
NowTimestampFn());
+  static UnboundedCountingSource createUnboundedFrom(long start) {
+    return new UnboundedCountingSource(start, 1, 1L, Duration.ZERO, new 
NowTimestampFn());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/88c66129/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java
new file mode 100644
index 0000000..189539f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A {@link PTransform} that produces longs starting from the given value, and 
either up to the
+ * given limit or until {@link Long#MAX_VALUE} / until the given time elapses.
+ *
+ * <p>The bounded {@link GenerateSequence} is implemented based on {@link 
OffsetBasedSource} and
+ * {@link OffsetBasedSource.OffsetBasedReader}, so it performs efficient 
initial splitting and it
+ * supports dynamic work rebalancing.
+ *
+ * <p>To produce a bounded {@code PCollection<Long>}:
+ *
+ * <pre>{@code
+ * Pipeline p = ...
+ * PCollection<Long> bounded = p.apply(GenerateSequence.from(0).to(1000));
+ * }</pre>
+ *
+ * <p>To produce an unbounded {@code PCollection<Long>}, simply do not specify 
{@link #to(long)},
+ * calling {@link #withTimestampFn(SerializableFunction)} to provide values 
with timestamps other
+ * than {@link Instant#now}.
+ *
+ * <pre>{@code
+ * Pipeline p = ...
+ *
+ * // To use processing time as the element timestamp.
+ * PCollection<Long> unbounded = p.apply(GenerateSequence.from(0));
+ * // Or, to use a provided function to set the element timestamp.
+ * PCollection<Long> unboundedWithTimestamps =
+ *     p.apply(GenerateSequence.from(0).withTimestampFn(someFn));
+ * }</pre>
+ *
+ * <p>In all cases, the sequence of numbers is generated in parallel, so there 
is no inherent
+ * ordering between the generated values - it is only guaranteed that all 
values in the given range
+ * will be present in the resulting {@link PCollection}.
+ */
+@AutoValue
+public abstract class GenerateSequence extends PTransform<PBegin, 
PCollection<Long>> {
+  abstract long getFrom();
+
+  abstract long getTo();
+
+  @Nullable
+  abstract SerializableFunction<Long, Instant> getTimestampFn();
+
+  abstract long getElementsPerPeriod();
+
+  @Nullable
+  abstract Duration getPeriod();
+
+  @Nullable
+  abstract Duration getMaxReadTime();
+
+  abstract Builder toBuilder();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+    abstract Builder setFrom(long from);
+
+    abstract Builder setTo(long to);
+
+    abstract Builder setTimestampFn(SerializableFunction<Long, Instant> 
timestampFn);
+
+    abstract Builder setElementsPerPeriod(long elementsPerPeriod);
+
+    abstract Builder setPeriod(Duration period);
+
+    abstract Builder setMaxReadTime(Duration maxReadTime);
+
+    abstract GenerateSequence build();
+  }
+
+  /** Specifies the minimum number to generate (inclusive). */
+  public static GenerateSequence from(long from) {
+    checkArgument(from >= 0, "Value of from must be non-negative, but was: 
%s", from);
+    return new AutoValue_GenerateSequence.Builder()
+        .setFrom(from)
+        .setTo(-1)
+        .setElementsPerPeriod(0)
+        .build();
+  }
+
+  /** Specifies the maximum number to generate (exclusive). */
+  public GenerateSequence to(long to) {
+    checkArgument(
+        getTo() == -1 || getTo() >= getFrom(), "Degenerate range [%s, %s)", 
getFrom(), getTo());
+    return toBuilder().setTo(to).build();
+  }
+
+  /** Specifies the function to use to assign timestamps to the elements. */
+  public GenerateSequence withTimestampFn(SerializableFunction<Long, Instant> 
timestampFn) {
+    return toBuilder().setTimestampFn(timestampFn).build();
+  }
+
+  /** Specifies to generate at most a given number of elements per a given 
period. */
+  public GenerateSequence withRate(long numElements, Duration periodLength) {
+    checkArgument(
+        numElements > 0,
+        "Number of elements in withRate must be positive, but was: %s",
+        numElements);
+    checkNotNull(periodLength, "periodLength in withRate must be non-null");
+    return 
toBuilder().setElementsPerPeriod(numElements).setPeriod(periodLength).build();
+  }
+
+  /** Specifies to stop generating elements after the given time. */
+  public GenerateSequence withMaxReadTime(Duration maxReadTime) {
+    return toBuilder().setMaxReadTime(maxReadTime).build();
+  }
+
+  @Override
+  public PCollection<Long> expand(PBegin input) {
+    boolean isRangeUnbounded = getTo() < 0;
+    boolean usesUnboundedFeatures =
+        getTimestampFn() != null || getElementsPerPeriod() > 0 || 
getMaxReadTime() != null;
+    if (!isRangeUnbounded && !usesUnboundedFeatures) {
+      // This is the only case when we can use the bounded CountingSource.
+      return 
input.apply(Read.from(CountingSource.createSourceForSubrange(getFrom(), 
getTo())));
+    }
+
+    CountingSource.UnboundedCountingSource source = 
CountingSource.createUnboundedFrom(getFrom());
+    if (getTimestampFn() != null) {
+      source = source.withTimestampFn(getTimestampFn());
+    }
+    if (getElementsPerPeriod() > 0) {
+      source = source.withRate(getElementsPerPeriod(), getPeriod());
+    }
+
+    Read.Unbounded<Long> readUnbounded = Read.from(source);
+
+    if (getMaxReadTime() == null) {
+      if (isRangeUnbounded) {
+        return input.apply(readUnbounded);
+      } else {
+        return input.apply(readUnbounded.withMaxNumRecords(getTo() - 
getFrom()));
+      }
+    } else {
+      BoundedReadFromUnboundedSource<Long> withMaxReadTime =
+          readUnbounded.withMaxReadTime(getMaxReadTime());
+      if (isRangeUnbounded) {
+        return input.apply(withMaxReadTime);
+      } else {
+        return input.apply(withMaxReadTime.withMaxNumRecords(getTo() - 
getFrom()));
+      }
+    }
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    builder.add(DisplayData.item("from", getFrom()).withLabel("Generate 
sequence from"));
+    builder.addIfNotDefault(
+        DisplayData.item("to", getTo()).withLabel("Generate sequence to 
(exclusive)"), -1L);
+    builder.addIfNotNull(
+        DisplayData.item(
+                "timestampFn", getTimestampFn() == null ? null : 
getTimestampFn().getClass())
+            .withLabel("Timestamp Function"));
+    builder.addIfNotNull(
+        DisplayData.item("maxReadTime", getMaxReadTime()).withLabel("Maximum 
Read Time"));
+    if (getElementsPerPeriod() > 0) {
+      builder.add(
+          DisplayData.item("elementsPerPeriod", getElementsPerPeriod())
+              .withLabel("Elements per period"));
+      builder.add(DisplayData.item("period", getPeriod()).withLabel("Period"));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/88c66129/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
index 8807164..d56160a 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
@@ -202,7 +202,7 @@ public class CountingSourceTest {
     PCollection<Long> input =
         p.apply(
             Read.from(
-                    CountingSource.createUnbounded()
+                    CountingSource.createUnboundedFrom(0)
                         .withTimestampFn(new ValueAsTimestampFn())
                         .withRate(1, period))
                 .withMaxNumRecords(numElements));
@@ -260,7 +260,7 @@ public class CountingSourceTest {
     int numSplits = 10;
 
     UnboundedCountingSource initial =
-        CountingSource.createUnbounded().withRate(elementsPerPeriod, period);
+        CountingSource.createUnboundedFrom(0).withRate(elementsPerPeriod, 
period);
     List<? extends UnboundedSource<Long, ?>> splits =
         initial.split(numSplits, p.getOptions());
     assertEquals("Expected exact splitting", numSplits, splits.size());

http://git-wip-us.apache.org/repos/asf/beam/blob/88c66129/sdks/java/core/src/test/java/org/apache/beam/sdk/io/GenerateSequenceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/GenerateSequenceTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/GenerateSequenceTest.java
new file mode 100644
index 0000000..49af479
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/GenerateSequenceTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Distinct;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.Min;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GenerateSequence}. */
+@RunWith(JUnit4.class)
+public class GenerateSequenceTest {
+  public static void addCountingAsserts(PCollection<Long> input, long start, 
long end) {
+    // Count == numElements
+    PAssert.thatSingleton(input.apply("Count", 
Count.<Long>globally())).isEqualTo(end - start);
+    // Unique count == numElements
+    PAssert.thatSingleton(
+            input.apply(Distinct.<Long>create()).apply("UniqueCount", 
Count.<Long>globally()))
+        .isEqualTo(end - start);
+    // Min == start
+    PAssert.thatSingleton(input.apply("Min", 
Min.<Long>globally())).isEqualTo(start);
+    // Max == end-1
+    PAssert.thatSingleton(input.apply("Max", 
Max.<Long>globally())).isEqualTo(end - 1);
+  }
+
+  @Rule public TestPipeline p = TestPipeline.create();
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void testBoundedInput() {
+    long numElements = 1000;
+    PCollection<Long> input = 
p.apply(GenerateSequence.from(0).to(numElements));
+
+    addCountingAsserts(input, 0, numElements);
+    p.run();
+  }
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void testEmptyBoundedInput() {
+    PCollection<Long> input = p.apply(GenerateSequence.from(0).to(0));
+
+    PAssert.that(input).empty();
+    p.run();
+  }
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void testEmptyBoundedInputSubrange() {
+    PCollection<Long> input = p.apply(GenerateSequence.from(42).to(42));
+
+    PAssert.that(input).empty();
+    p.run();
+  }
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void testBoundedInputSubrange() {
+    long start = 10;
+    long end = 1000;
+    PCollection<Long> input = p.apply(GenerateSequence.from(start).to(end));
+
+    addCountingAsserts(input, start, end);
+    p.run();
+  }
+
+  @Test
+  public void testBoundedDisplayData() {
+    PTransform<?, ?> input = GenerateSequence.from(0).to(1234);
+    DisplayData displayData = DisplayData.from(input);
+    assertThat(displayData, hasDisplayItem("from", 0));
+    assertThat(displayData, hasDisplayItem("to", 1234));
+  }
+
+  @Test
+  public void testBoundedDisplayDataSubrange() {
+    PTransform<?, ?> input = GenerateSequence.from(12).to(1234);
+    DisplayData displayData = DisplayData.from(input);
+    assertThat(displayData, hasDisplayItem("from", 12));
+    assertThat(displayData, hasDisplayItem("to", 1234));
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testUnboundedInputRate() {
+    long numElements = 5000;
+
+    long elemsPerPeriod = 10L;
+    Duration periodLength = Duration.millis(8);
+    PCollection<Long> input =
+        
p.apply(GenerateSequence.from(0).to(numElements).withRate(elemsPerPeriod, 
periodLength));
+
+    addCountingAsserts(input, 0, numElements);
+    long expectedRuntimeMillis = (periodLength.getMillis() * numElements) / 
elemsPerPeriod;
+    Instant startTime = Instant.now();
+    p.run();
+    Instant endTime = Instant.now();
+    assertThat(endTime.isAfter(startTime.plus(expectedRuntimeMillis)), 
is(true));
+  }
+
+  private static class ElementValueDiff extends DoFn<Long, Long> {
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      c.output(c.element() - c.timestamp().getMillis());
+    }
+  }
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void testUnboundedInputTimestamps() {
+    long numElements = 1000;
+
+    PCollection<Long> input =
+        p.apply(GenerateSequence.from(0).to(numElements).withTimestampFn(new 
ValueAsTimestampFn()));
+    addCountingAsserts(input, 0, numElements);
+
+    PCollection<Long> diffs =
+        input
+            .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
+            .apply("DistinctTimestamps", Distinct.<Long>create());
+    // This assert also confirms that diffs only has one unique value.
+    PAssert.thatSingleton(diffs).isEqualTo(0L);
+
+    p.run();
+  }
+
+  @Test
+  public void testUnboundedDisplayData() {
+    Duration maxReadTime = Duration.standardHours(5);
+    SerializableFunction<Long, Instant> timestampFn =
+        new SerializableFunction<Long, Instant>() {
+          @Override
+          public Instant apply(Long input) {
+            return Instant.now();
+          }
+        };
+
+    PTransform<?, ?> input =
+        
GenerateSequence.from(0).to(1234).withMaxReadTime(maxReadTime).withTimestampFn(timestampFn);
+
+    DisplayData displayData = DisplayData.from(input);
+
+    assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime));
+    assertThat(displayData, hasDisplayItem("timestampFn", 
timestampFn.getClass()));
+  }
+
+  /**
+   * A timestamp function that uses the given value as the timestamp. Because 
the input values will
+   * not wrap, this function is non-decreasing and meets the timestamp 
function criteria laid out in
+   * {@link GenerateSequence#withTimestampFn(SerializableFunction)}.
+   */
+  private static class ValueAsTimestampFn implements 
SerializableFunction<Long, Instant> {
+    @Override
+    public Instant apply(Long input) {
+      return new Instant(input);
+    }
+  }
+}

Reply via email to