Repository: beam
Updated Branches:
  refs/heads/release-2.0.0 64b493a89 -> c79aa4c43


[BEAM-2095] Made SourceRDD hasNext idempotent


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/17da5e93
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/17da5e93
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/17da5e93

Branch: refs/heads/release-2.0.0
Commit: 17da5e933b0cb4850a744a42f4a158fa00b230ab
Parents: 64b493a
Author: Stas Levin <stasle...@apache.org>
Authored: Mon May 1 07:30:49 2017 +0300
Committer: Luke Cwik <lc...@google.com>
Committed: Fri May 12 10:00:46 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/spark/io/SourceRDD.java | 173 ++++++++++++-------
 .../spark/io/ReaderToIteratorAdapterTest.java   | 145 ++++++++++++++++
 2 files changed, 260 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/17da5e93/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
index 7b7d216..01cc176 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
@@ -19,12 +19,15 @@
 package org.apache.beam.runners.spark.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
 import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
@@ -47,6 +50,7 @@ import org.apache.spark.rdd.RDD;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
+import scala.collection.JavaConversions;
 
 /**
  * Classes implementing Beam {@link Source} {@link RDD}s.
@@ -118,80 +122,133 @@ public class SourceRDD {
       }
     }
 
+    private BoundedSource.BoundedReader<T> createReader(SourcePartition<T> 
partition) {
+      try {
+        return ((BoundedSource<T>) partition.source).createReader(
+            runtimeContext.getPipelineOptions());
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to create reader from a 
BoundedSource.", e);
+      }
+    }
+
     @Override
     public scala.collection.Iterator<WindowedValue<T>> compute(final Partition 
split,
-                                                               TaskContext 
context) {
+                                                               final 
TaskContext context) {
       final MetricsContainer metricsContainer = 
metricsAccum.localValue().getContainer(stepName);
 
-      final Iterator<WindowedValue<T>> iter = new Iterator<WindowedValue<T>>() 
{
-        @SuppressWarnings("unchecked")
-        SourcePartition<T> partition = (SourcePartition<T>) split;
-        BoundedSource.BoundedReader<T> reader = createReader(partition);
-
-        private boolean finished = false;
-        private boolean started = false;
-        private boolean closed = false;
-
-        @Override
-        public boolean hasNext() {
-          // Add metrics container to the scope of 
org.apache.beam.sdk.io.Source.Reader methods
-          // since they may report metrics.
-          try (Closeable ignored = 
MetricsEnvironment.scopedMetricsContainer(metricsContainer)) {
-            try {
-              if (!started) {
-                started = true;
-                finished = !reader.start();
-              } else {
-                finished = !reader.advance();
-              }
-              if (finished) {
-                // safely close the reader if there are no more elements left 
to read.
-                closeIfNotClosed();
-              }
-              return !finished;
-            } catch (IOException e) {
-              closeIfNotClosed();
-              throw new RuntimeException("Failed to read from reader.", e);
+      @SuppressWarnings("unchecked")
+      final BoundedSource.BoundedReader<T> reader = 
createReader((SourcePartition<T>) split);
+
+      final Iterator<WindowedValue<T>> readerIterator =
+          new ReaderToIteratorAdapter<>(metricsContainer, reader);
+
+      return new InterruptibleIterator<>(context, 
JavaConversions.asScalaIterator(readerIterator));
+    }
+
+    /**
+     * Exposes an <code>Iterator</code>&lt;{@link WindowedValue}&gt; interface 
on top of a
+     * {@link Source.Reader}.
+     * <p>
+     *   <code>hasNext</code> is idempotent and returns <code>true</code> iff 
further items are
+     *   available for reading using the underlying reader.
+     *   Consequently, when the reader is closed, or when the reader has no 
further elements
+     *   available (i.e, {@link Source.Reader#advance()} returned 
<code>false</code>),
+     *   <code>hasNext</code> returns <code>false</code>.
+     * </p>
+     * <p>
+     *   Since this is a read-only iterator, an attempt to call 
<code>remove</code> will throw an
+     *   <code>UnsupportedOperationException</code>.
+     * </p>
+     */
+    @VisibleForTesting
+    static class ReaderToIteratorAdapter<T> implements 
Iterator<WindowedValue<T>> {
+
+      private static final boolean FAILED_TO_OBTAIN_NEXT = false;
+      private static final boolean SUCCESSFULLY_OBTAINED_NEXT = true;
+
+      private final MetricsContainer metricsContainer;
+      private final Source.Reader<T> reader;
+
+      private boolean started = false;
+      private boolean closed = false;
+      private WindowedValue<T> next = null;
+
+      ReaderToIteratorAdapter(final MetricsContainer metricsContainer,
+                              final Source.Reader<T> reader) {
+        this.metricsContainer = metricsContainer;
+        this.reader = reader;
+      }
+
+      private boolean tryProduceNext() {
+        try (Closeable ignored = 
MetricsEnvironment.scopedMetricsContainer(metricsContainer)) {
+          if (closed) {
+            return FAILED_TO_OBTAIN_NEXT;
+          } else {
+            checkState(next == null, "unexpected non-null value for next");
+            if (seekNext()) {
+              next = 
WindowedValue.timestampedValueInGlobalWindow(reader.getCurrent(),
+                                                                  
reader.getCurrentTimestamp());
+              return SUCCESSFULLY_OBTAINED_NEXT;
+            } else {
+              close();
+              return FAILED_TO_OBTAIN_NEXT;
             }
-          } catch (IOException e) {
-            throw new RuntimeException(e);
           }
+        } catch (final Exception e) {
+          throw new RuntimeException("Failed to read data.", e);
+        }
+      }
+
+      private void close() {
+        closed = true;
+        try {
+          reader.close();
+        } catch (final IOException e) {
+          throw new RuntimeException(e);
         }
+      }
 
-        @Override
-        public WindowedValue<T> next() {
-          return 
WindowedValue.timestampedValueInGlobalWindow(reader.getCurrent(),
-              reader.getCurrentTimestamp());
+      private boolean seekNext() throws IOException {
+        if (!started) {
+          started = true;
+          return reader.start();
+        } else {
+          return !closed && reader.advance();
         }
+      }
 
-        @Override
-        public void remove() {
-          throw new UnsupportedOperationException("Remove from partition 
iterator is not allowed.");
+      private WindowedValue<T> consumeCurrent() {
+        if (next == null) {
+          throw new NoSuchElementException();
+        } else {
+          final WindowedValue<T> current = next;
+          next = null;
+          return current;
         }
+      }
 
-        private void closeIfNotClosed() {
-          if (!closed) {
-            closed = true;
-            try {
-              reader.close();
-            } catch (IOException e) {
-              throw new RuntimeException("Failed to close Reader.", e);
-            }
-          }
+      private WindowedValue<T> consumeNext() {
+        if (next == null) {
+          tryProduceNext();
         }
-      };
+        return consumeCurrent();
+      }
 
-      return new InterruptibleIterator<>(context,
-          scala.collection.JavaConversions.asScalaIterator(iter));
-    }
+      @Override
+      public boolean hasNext() {
+        return next != null || tryProduceNext();
+      }
 
-    private BoundedSource.BoundedReader<T> createReader(SourcePartition<T> 
partition) {
-      try {
-        return ((BoundedSource<T>) partition.source).createReader(
-            runtimeContext.getPipelineOptions());
-      } catch (IOException e) {
-        throw new RuntimeException("Failed to create reader from a 
BoundedSource.", e);
+      @Override
+      public WindowedValue<T> next() {
+        return consumeNext();
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
       }
+
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/17da5e93/runners/spark/src/test/java/org/apache/beam/runners/spark/io/ReaderToIteratorAdapterTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/ReaderToIteratorAdapterTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/ReaderToIteratorAdapterTest.java
new file mode 100644
index 0000000..5728fa0
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/ReaderToIteratorAdapterTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.io;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
+import org.apache.beam.sdk.io.Source;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Test for {@link SourceRDD.Bounded.ReaderToIteratorAdapter}.
+ */
+public class ReaderToIteratorAdapterTest {
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  private static class TestReader extends Source.Reader<Integer> {
+
+    static final int LIMIT = 4;
+    static final int START = 1;
+
+    private Integer current = START - 1;
+    private boolean closed = false;
+    private boolean drained = false;
+
+    boolean isClosed() {
+      return closed;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      checkState(!drained && !closed);
+      drained = ++current >= LIMIT;
+      return !drained;
+    }
+
+    @Override
+    public Integer getCurrent() throws NoSuchElementException {
+      checkState(!drained && !closed);
+      return current;
+    }
+
+    @Override
+    public Instant getCurrentTimestamp() throws NoSuchElementException {
+      checkState(!drained && !closed);
+      return Instant.now();
+    }
+
+    @Override
+    public void close() throws IOException {
+      checkState(!closed);
+      closed = true;
+    }
+
+    @Override
+    public Source<Integer> getCurrentSource() {
+      return null;
+    }
+  }
+
+  private final TestReader testReader = new TestReader();
+
+  private final SourceRDD.Bounded.ReaderToIteratorAdapter<Integer> 
readerIterator =
+      new SourceRDD.Bounded.ReaderToIteratorAdapter<>(new 
MetricsContainerImpl(""), testReader);
+
+  private void assertReaderRange(final int start, final int end) {
+    for (int i = start; i < end; i++) {
+      assertThat(readerIterator.next().getValue(), is(i));
+    }
+  }
+
+  @Test
+  public void testReaderIsClosedAfterDrainage() throws Exception {
+    assertReaderRange(TestReader.START, TestReader.LIMIT);
+
+    assertThat(readerIterator.hasNext(), is(false));
+
+    // reader is closed only after hasNext realises there are no more elements
+    assertThat(testReader.isClosed(), is(true));
+  }
+
+  @Test
+  public void testNextWhenDrainedThrows() throws Exception {
+    assertReaderRange(TestReader.START, TestReader.LIMIT);
+
+    exception.expect(NoSuchElementException.class);
+    readerIterator.next();
+  }
+
+  @Test
+  public void testHasNextIdempotencyCombo() throws Exception {
+    assertThat(readerIterator.hasNext(), is(true));
+    assertThat(readerIterator.hasNext(), is(true));
+
+    assertThat(readerIterator.next().getValue(), is(1));
+
+    assertThat(readerIterator.hasNext(), is(true));
+    assertThat(readerIterator.hasNext(), is(true));
+    assertThat(readerIterator.hasNext(), is(true));
+
+    assertThat(readerIterator.next().getValue(), is(2));
+    assertThat(readerIterator.next().getValue(), is(3));
+
+    // drained
+
+    assertThat(readerIterator.hasNext(), is(false));
+    assertThat(readerIterator.hasNext(), is(false));
+
+    // no next to give
+
+    exception.expect(NoSuchElementException.class);
+    readerIterator.next();
+  }
+
+}

Reply via email to