igorbernstein2 commented on code in PR #16939:
URL: https://github.com/apache/beam/pull/16939#discussion_r872493374


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +212,224 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableSegmentReaderImpl implements Reader {
+    private BigtableSession session;
+
+    @Nullable private ReadRowsRequest nextRequest;
+    private final Queue<Row> buffer;
+    private final long refillSegmentWaterMark;
+    private final long maxSegmentByteSize;
+    private Future<UpstreamResults> future;
+    private Row currentRow;
+    private static ServiceCallMetric serviceCallMetric;
+
+    private static class UpstreamResults {
+      private final List<Row> rows;
+      private final @Nullable ReadRowsRequest nextRequest;
+
+      UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest nextRequest) {
+        this.rows = rows;
+        this.nextRequest = nextRequest;
+      }
+    }
+
+    public static BigtableSegmentReaderImpl create(BigtableSession session, 
BigtableSource source) {
+      RowSet set;
+      if (source.getRanges().isEmpty()) {
+        set = 
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
+      } else {
+        RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+        for (int i = 0; i < source.getRanges().size(); i++) {
+          rowRanges[i] =
+              RowRange.newBuilder()
+                  .setStartKeyClosed(
+                      
ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+                  .setEndKeyOpen(
+                      
ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+                  .build();
+        }
+        set =
+            RowSet.newBuilder()
+                
.addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))

Review Comment:
   why the triple copy? source.getRanges() -> RowRange[] -> collect() -> 
RowSet.addAll..?
   
   This would be cleaner and a lot more efficient
   
   ```java
   RowSet.Builder rowSetBuilder = RowSet.newBuilder();
   
   for (Range beamRange : source.getRanges()) {
     RowRange btRange = rowSetBuilder.newRangeBuilder();
     btRange.setStartKey(..).setEndKey(...)
   }
   RowSet rowSet = rowSetBuilder.build();
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +212,224 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableSegmentReaderImpl implements Reader {
+    private BigtableSession session;
+
+    @Nullable private ReadRowsRequest nextRequest;
+    private final Queue<Row> buffer;
+    private final long refillSegmentWaterMark;
+    private final long maxSegmentByteSize;
+    private Future<UpstreamResults> future;
+    private Row currentRow;
+    private static ServiceCallMetric serviceCallMetric;
+
+    private static class UpstreamResults {
+      private final List<Row> rows;
+      private final @Nullable ReadRowsRequest nextRequest;
+
+      UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest nextRequest) {
+        this.rows = rows;
+        this.nextRequest = nextRequest;
+      }
+    }
+
+    public static BigtableSegmentReaderImpl create(BigtableSession session, 
BigtableSource source) {
+      RowSet set;
+      if (source.getRanges().isEmpty()) {
+        set = 
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
+      } else {
+        RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+        for (int i = 0; i < source.getRanges().size(); i++) {
+          rowRanges[i] =
+              RowRange.newBuilder()
+                  .setStartKeyClosed(
+                      
ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+                  .setEndKeyOpen(
+                      
ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+                  .build();
+        }
+        set =
+            RowSet.newBuilder()
+                
.addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+                .build();
+      }
+
+      RowFilter filter =
+          MoreObjects.firstNonNull(source.getRowFilter(), 
RowFilter.getDefaultInstance());
+      ReadRowsRequest request =
+          ReadRowsRequest.newBuilder()
+              .setTableName(
+                  
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get()))
+              .setRows(set)
+              .setFilter(filter)
+              .setRowsLimit(source.getMaxBufferElementCount())
+              .build();
+
+      serviceCallMetric = populateReaderCallMetric(session, 
source.getTableId().get());

Review Comment:
   pass this as a constructor arg



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +212,224 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableSegmentReaderImpl implements Reader {
+    private BigtableSession session;
+
+    @Nullable private ReadRowsRequest nextRequest;
+    private final Queue<Row> buffer;
+    private final long refillSegmentWaterMark;
+    private final long maxSegmentByteSize;
+    private Future<UpstreamResults> future;
+    private Row currentRow;
+    private static ServiceCallMetric serviceCallMetric;

Review Comment:
   static looks wrong here



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +212,224 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableSegmentReaderImpl implements Reader {
+    private BigtableSession session;
+
+    @Nullable private ReadRowsRequest nextRequest;
+    private final Queue<Row> buffer;
+    private final long refillSegmentWaterMark;
+    private final long maxSegmentByteSize;
+    private Future<UpstreamResults> future;
+    private Row currentRow;

Review Comment:
   Nullable



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +212,224 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableSegmentReaderImpl implements Reader {
+    private BigtableSession session;
+
+    @Nullable private ReadRowsRequest nextRequest;
+    private final Queue<Row> buffer;
+    private final long refillSegmentWaterMark;
+    private final long maxSegmentByteSize;
+    private Future<UpstreamResults> future;
+    private Row currentRow;
+    private static ServiceCallMetric serviceCallMetric;
+
+    private static class UpstreamResults {
+      private final List<Row> rows;
+      private final @Nullable ReadRowsRequest nextRequest;
+
+      UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest nextRequest) {

Review Comment:
   private



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java:
##########
@@ -178,4 +554,59 @@ private void verifyMetricWasSet(String method, String 
status, long count) {
         (MetricsContainerImpl) MetricsEnvironment.getProcessWideContainer();
     assertEquals(count, (long) container.getCounter(name).getCumulative());
   }
+
+  public Answer<ScanHandler> mockReadRowsAnswer(List<FlatRow> rows) {
+    return new Answer<ScanHandler>() {
+      @Override
+      public ScanHandler answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+        StreamObserver<FlatRow> flatRowObserver = 
invocationOnMock.getArgument(1);
+        new Thread() {
+          @Override
+          public void run() {
+            for (int i = 0; i < rows.size(); i++) {
+              flatRowObserver.onNext(rows.get(i));
+            }
+            flatRowObserver.onCompleted();
+          }
+        }.start();
+
+        return scanHandler;
+      }
+    };
+  }
+
+  public List<FlatRow> generateSegmentResult(String prefix, int startIndex, 
int count, boolean largeRow) {

Review Comment:
   you use largeRow=true in exactly one place, and have to false everywhere 
else...this seems like a bad tradeoff.
   
   Either have the exception inline the code it needs or have 2 wrappers for 
this method:
   
   generateSegmentResult(prefix,start,count) { 
generateSegmentResult(prefix,start,count, false)} 
   generateLargeSegmentResult(prefix,start,count) { 
generateSegmentResult(prefix,start,count, true)} 
   
   



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java:
##########
@@ -178,4 +554,59 @@ private void verifyMetricWasSet(String method, String 
status, long count) {
         (MetricsContainerImpl) MetricsEnvironment.getProcessWideContainer();
     assertEquals(count, (long) container.getCounter(name).getCumulative());
   }
+
+  public Answer<ScanHandler> mockReadRowsAnswer(List<FlatRow> rows) {
+    return new Answer<ScanHandler>() {
+      @Override
+      public ScanHandler answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+        StreamObserver<FlatRow> flatRowObserver = 
invocationOnMock.getArgument(1);
+        new Thread() {
+          @Override
+          public void run() {
+            for (int i = 0; i < rows.size(); i++) {
+              flatRowObserver.onNext(rows.get(i));
+            }
+            flatRowObserver.onCompleted();
+          }
+        }.start();
+
+        return scanHandler;
+      }
+    };
+  }
+
+  public List<FlatRow> generateSegmentResult(String prefix, int startIndex, 
int count, boolean largeRow) {

Review Comment:
   looking at the callsite its really hard to know what false and true mean here



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java:
##########
@@ -126,6 +154,354 @@ public void testRead() throws IOException {
     verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
   }
 
+  /**
+   * This test ensures that protobuf creation and interactions with {@link 
BigtableDataClient} work
+   * as expected. This test checks that a single row is returned from the 
future.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSingleRangeBelowSegmentLimit() throws Exception {
+    ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+    ByteKey end = ByteKey.copyFrom("b".getBytes(StandardCharsets.UTF_8));
+
+    
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
 end)));
+    FlatRow expectedRow = 
FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build();
+
+    when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), 
any()))
+        .thenAnswer(mockReadRowsAnswer(Arrays.asList(expectedRow)));
+
+    BigtableService.Reader underTest =
+        BigtableServiceImpl.BigtableSegmentReaderImpl.create(mockSession, 
mockBigtableSource);

Review Comment:
   Now that you have a constructor that takes the underlying types, you no 
longer need to mock the source, just use the constructor



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java:
##########
@@ -126,6 +154,354 @@ public void testRead() throws IOException {
     verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
   }
 
+  /**
+   * This test ensures that protobuf creation and interactions with {@link 
BigtableDataClient} work
+   * as expected. This test checks that a single row is returned from the 
future.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSingleRangeBelowSegmentLimit() throws Exception {
+    ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+    ByteKey end = ByteKey.copyFrom("b".getBytes(StandardCharsets.UTF_8));
+
+    
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
 end)));
+    FlatRow expectedRow = 
FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build();
+
+    when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), 
any()))
+        .thenAnswer(mockReadRowsAnswer(Arrays.asList(expectedRow)));
+
+    BigtableService.Reader underTest =
+        BigtableServiceImpl.BigtableSegmentReaderImpl.create(mockSession, 
mockBigtableSource);
+
+    underTest.start();
+    Assert.assertEquals(FlatRowConverter.convert(expectedRow), 
underTest.getCurrentRow());
+    Assert.assertFalse(underTest.advance());
+    underTest.close();
+
+    verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 2);
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and 
read. This example
+   * uses a single range with SEGMENT_SIZE*2+1 rows. Range: [b00000, b00001, 
... b00199, b00200)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSingleRangeAboveSegmentLimit() throws IOException {
+    ByteKey start = generateByteKey(DEFAULT_PREFIX, 0);
+    ByteKey end = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE * 2);
+
+    
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
 end)));
+
+    List<List<FlatRow>> expectedResults = ImmutableList.of(
+        generateSegmentResult(DEFAULT_PREFIX,0,SEGMENT_SIZE, false),
+        generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE, 
false),
+        ImmutableList.of());
+
+    OngoingStubbing
+        <?> stub = 
when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()));

Review Comment:
   this is a weird linebreak



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +212,224 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableSegmentReaderImpl implements Reader {
+    private BigtableSession session;
+
+    @Nullable private ReadRowsRequest nextRequest;
+    private final Queue<Row> buffer;
+    private final long refillSegmentWaterMark;
+    private final long maxSegmentByteSize;
+    private Future<UpstreamResults> future;
+    private Row currentRow;
+    private static ServiceCallMetric serviceCallMetric;
+
+    private static class UpstreamResults {
+      private final List<Row> rows;
+      private final @Nullable ReadRowsRequest nextRequest;
+
+      UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest nextRequest) {
+        this.rows = rows;
+        this.nextRequest = nextRequest;
+      }
+    }
+
+    public static BigtableSegmentReaderImpl create(BigtableSession session, 
BigtableSource source) {
+      RowSet set;
+      if (source.getRanges().isEmpty()) {
+        set = 
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
+      } else {
+        RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+        for (int i = 0; i < source.getRanges().size(); i++) {
+          rowRanges[i] =
+              RowRange.newBuilder()
+                  .setStartKeyClosed(
+                      
ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+                  .setEndKeyOpen(
+                      
ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+                  .build();
+        }
+        set =
+            RowSet.newBuilder()
+                
.addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+                .build();
+      }
+
+      RowFilter filter =
+          MoreObjects.firstNonNull(source.getRowFilter(), 
RowFilter.getDefaultInstance());
+      ReadRowsRequest request =
+          ReadRowsRequest.newBuilder()
+              .setTableName(
+                  
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get()))
+              .setRows(set)
+              .setFilter(filter)
+              .setRowsLimit(source.getMaxBufferElementCount())
+              .build();
+
+      serviceCallMetric = populateReaderCallMetric(session, 
source.getTableId().get());
+
+      long maxSegmentByteSize =
+          (long)(Runtime.getRuntime().totalMemory()
+              * DEFAULT_BYTE_LIMIT_PERCENTAGE);
+
+      return new BigtableSegmentReaderImpl(session, request, 
maxSegmentByteSize);
+    }
+
+    @VisibleForTesting
+    BigtableSegmentReaderImpl(BigtableSession session, ReadRowsRequest 
request, long maxSegmentByteSize) {
+      this.session = session;
+      this.nextRequest = request;
+      this.maxSegmentByteSize = maxSegmentByteSize;
+      this.buffer = new ArrayDeque<>();
+      // Asynchronously refill buffer when there is 10% of the elements are 
left
+      this.refillSegmentWaterMark = request.getRowsLimit() / 10;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      future = fetchNextSegment();
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      System.out.println("HI");
+      if (buffer.size() < refillSegmentWaterMark && future == null ) {
+        future = fetchNextSegment();
+      }
+      if (buffer.isEmpty() && future != null) {
+        waitReadRowsFuture();
+      }
+      currentRow = buffer.poll();
+      return currentRow != null;
+    }
+
+    private Future<UpstreamResults> fetchNextSegment() {
+      SettableFuture<UpstreamResults> f = SettableFuture.create();
+      // When the nextRequest is null, the last fill completed and the buffer 
contains the last rows
+      if (nextRequest == null) {
+        f.set(new UpstreamResults(ImmutableList.of(), null));
+        return f;
+      }
+
+      // TODO(diegomez): Remove atomic ScanHandler for simpler 
StreamObserver/Future implementation
+      AtomicReference<ScanHandler> atomicScanHandler = new AtomicReference<>();
+      ScanHandler handler =
+          session
+              .getDataClient()
+              .readFlatRows(
+                  nextRequest,
+                  new StreamObserver<FlatRow>() {
+                    List<Row> rows = new ArrayList<>();
+                    long currentByteSize = 0;
+                    boolean byteLimitReached = false;
+
+                    @Override
+                    public void onNext(FlatRow flatRow) {
+                      Row row = FlatRowConverter.convert(flatRow);
+                      currentByteSize += row.getSerializedSize();
+                      rows.add(row);
+
+                      if (currentByteSize > maxSegmentByteSize) {
+                        byteLimitReached = true;
+                        atomicScanHandler.get().cancel();
+                        return;
+                      }
+                    }
+
+                    @Override
+                    public void onError(Throwable e) {
+                      f.setException(e);
+                    }
+
+                    @Override
+                    public void onCompleted() {
+                      ReadRowsRequest nextNextRequest = null;
+
+                      // When requested rows < limit, the current request will 
be the last
+                      if (byteLimitReached || rows.size() == 
nextRequest.getRowsLimit()) {
+                        nextNextRequest =
+                            truncateRequest(nextRequest, rows.get(rows.size() 
- 1).getKey());
+                      }
+                      f.set(new UpstreamResults(rows, nextNextRequest));
+                    }
+                  });
+      atomicScanHandler.set(handler);
+      return f;
+    }
+
+    private void waitReadRowsFuture() throws IOException {
+      try {
+        UpstreamResults r = future.get();
+        buffer.addAll(r.rows);
+        nextRequest = r.nextRequest;
+        future = null;
+        serviceCallMetric.call("ok");
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException(e);
+      } catch (ExecutionException e) {
+        throw new IOException(e.getCause());
+      }
+      catch (Exception e) {
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+          throw new IOException(e);
+        }
+        if (e.getCause() instanceof StatusRuntimeException) {
+          throw new IOException(e);
+        }
+      }
+    }
+
+    private ReadRowsRequest truncateRequest(ReadRowsRequest request, 
ByteString lastKey) {
+      RowSet.Builder segment = RowSet.newBuilder();
+
+      for (RowRange rowRange : request.getRows().getRowRangesList()) {
+        int startCmp = StartPoint.extract(rowRange).compareTo(new 
StartPoint(lastKey, true));
+        int endCmp = EndPoint.extract(rowRange).compareTo(new 
EndPoint(lastKey, true));
+
+        if (startCmp > 0) {
+          // If the startKey is passed the split point than add the whole range
+          segment.addRowRanges(rowRange);
+        } else if (endCmp > 0) {
+          // Row is split, remove all read rowKeys and split RowSet at last 
buffered Row
+          RowRange subRange = 
rowRange.toBuilder().setStartKeyOpen(lastKey).build();
+          segment.addRowRanges(subRange);
+        }
+      }
+      if (segment.getRowRangesCount() == 0) {
+        return null;
+      }
+      ReadRowsRequest newRequest =
+          ReadRowsRequest.newBuilder()
+              .setTableName(request.getTableName())
+              .setRows(segment.build())
+              .setFilter(request.getFilter())
+              .setRowsLimit(request.getRowsLimit())
+              .build();
+
+      return newRequest;
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (session == null) {
+        // Only possible when previously closed, so we know that results is 
also null.

Review Comment:
   what does this comment mean?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +212,224 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableSegmentReaderImpl implements Reader {
+    private BigtableSession session;
+
+    @Nullable private ReadRowsRequest nextRequest;
+    private final Queue<Row> buffer;
+    private final long refillSegmentWaterMark;
+    private final long maxSegmentByteSize;
+    private Future<UpstreamResults> future;
+    private Row currentRow;
+    private static ServiceCallMetric serviceCallMetric;
+
+    private static class UpstreamResults {
+      private final List<Row> rows;
+      private final @Nullable ReadRowsRequest nextRequest;
+
+      UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest nextRequest) {
+        this.rows = rows;
+        this.nextRequest = nextRequest;
+      }
+    }
+
+    public static BigtableSegmentReaderImpl create(BigtableSession session, 
BigtableSource source) {
+      RowSet set;
+      if (source.getRanges().isEmpty()) {
+        set = 
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
+      } else {
+        RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+        for (int i = 0; i < source.getRanges().size(); i++) {
+          rowRanges[i] =
+              RowRange.newBuilder()
+                  .setStartKeyClosed(
+                      
ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+                  .setEndKeyOpen(
+                      
ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+                  .build();
+        }
+        set =
+            RowSet.newBuilder()
+                
.addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+                .build();
+      }
+
+      RowFilter filter =
+          MoreObjects.firstNonNull(source.getRowFilter(), 
RowFilter.getDefaultInstance());
+      ReadRowsRequest request =
+          ReadRowsRequest.newBuilder()
+              .setTableName(
+                  
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get()))
+              .setRows(set)
+              .setFilter(filter)
+              .setRowsLimit(source.getMaxBufferElementCount())
+              .build();
+
+      serviceCallMetric = populateReaderCallMetric(session, 
source.getTableId().get());
+
+      long maxSegmentByteSize =
+          (long)(Runtime.getRuntime().totalMemory()
+              * DEFAULT_BYTE_LIMIT_PERCENTAGE);
+
+      return new BigtableSegmentReaderImpl(session, request, 
maxSegmentByteSize);
+    }
+
+    @VisibleForTesting
+    BigtableSegmentReaderImpl(BigtableSession session, ReadRowsRequest 
request, long maxSegmentByteSize) {
+      this.session = session;
+      this.nextRequest = request;
+      this.maxSegmentByteSize = maxSegmentByteSize;
+      this.buffer = new ArrayDeque<>();
+      // Asynchronously refill buffer when there is 10% of the elements are 
left
+      this.refillSegmentWaterMark = request.getRowsLimit() / 10;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      future = fetchNextSegment();
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      System.out.println("HI");

Review Comment:
   hello :)
   please remove debug code



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +212,224 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableSegmentReaderImpl implements Reader {
+    private BigtableSession session;
+
+    @Nullable private ReadRowsRequest nextRequest;
+    private final Queue<Row> buffer;
+    private final long refillSegmentWaterMark;
+    private final long maxSegmentByteSize;
+    private Future<UpstreamResults> future;
+    private Row currentRow;
+    private static ServiceCallMetric serviceCallMetric;
+
+    private static class UpstreamResults {
+      private final List<Row> rows;
+      private final @Nullable ReadRowsRequest nextRequest;
+
+      UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest nextRequest) {
+        this.rows = rows;
+        this.nextRequest = nextRequest;
+      }
+    }
+
+    public static BigtableSegmentReaderImpl create(BigtableSession session, 
BigtableSource source) {
+      RowSet set;
+      if (source.getRanges().isEmpty()) {
+        set = 
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
+      } else {
+        RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+        for (int i = 0; i < source.getRanges().size(); i++) {
+          rowRanges[i] =
+              RowRange.newBuilder()
+                  .setStartKeyClosed(
+                      
ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+                  .setEndKeyOpen(
+                      
ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+                  .build();
+        }
+        set =
+            RowSet.newBuilder()
+                
.addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+                .build();
+      }
+
+      RowFilter filter =
+          MoreObjects.firstNonNull(source.getRowFilter(), 
RowFilter.getDefaultInstance());
+      ReadRowsRequest request =
+          ReadRowsRequest.newBuilder()
+              .setTableName(
+                  
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get()))
+              .setRows(set)
+              .setFilter(filter)
+              .setRowsLimit(source.getMaxBufferElementCount())
+              .build();
+
+      serviceCallMetric = populateReaderCallMetric(session, 
source.getTableId().get());
+
+      long maxSegmentByteSize =
+          (long)(Runtime.getRuntime().totalMemory()
+              * DEFAULT_BYTE_LIMIT_PERCENTAGE);
+
+      return new BigtableSegmentReaderImpl(session, request, 
maxSegmentByteSize);
+    }
+
+    @VisibleForTesting
+    BigtableSegmentReaderImpl(BigtableSession session, ReadRowsRequest 
request, long maxSegmentByteSize) {
+      this.session = session;
+      this.nextRequest = request;
+      this.maxSegmentByteSize = maxSegmentByteSize;
+      this.buffer = new ArrayDeque<>();
+      // Asynchronously refill buffer when there is 10% of the elements are 
left
+      this.refillSegmentWaterMark = request.getRowsLimit() / 10;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      future = fetchNextSegment();
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      System.out.println("HI");
+      if (buffer.size() < refillSegmentWaterMark && future == null ) {
+        future = fetchNextSegment();
+      }
+      if (buffer.isEmpty() && future != null) {
+        waitReadRowsFuture();
+      }
+      currentRow = buffer.poll();
+      return currentRow != null;
+    }
+
+    private Future<UpstreamResults> fetchNextSegment() {
+      SettableFuture<UpstreamResults> f = SettableFuture.create();
+      // When the nextRequest is null, the last fill completed and the buffer 
contains the last rows
+      if (nextRequest == null) {
+        f.set(new UpstreamResults(ImmutableList.of(), null));
+        return f;
+      }
+
+      // TODO(diegomez): Remove atomic ScanHandler for simpler 
StreamObserver/Future implementation
+      AtomicReference<ScanHandler> atomicScanHandler = new AtomicReference<>();
+      ScanHandler handler =
+          session
+              .getDataClient()
+              .readFlatRows(
+                  nextRequest,
+                  new StreamObserver<FlatRow>() {
+                    List<Row> rows = new ArrayList<>();
+                    long currentByteSize = 0;
+                    boolean byteLimitReached = false;
+
+                    @Override
+                    public void onNext(FlatRow flatRow) {
+                      Row row = FlatRowConverter.convert(flatRow);
+                      currentByteSize += row.getSerializedSize();
+                      rows.add(row);
+
+                      if (currentByteSize > maxSegmentByteSize) {
+                        byteLimitReached = true;
+                        atomicScanHandler.get().cancel();
+                        return;
+                      }
+                    }
+
+                    @Override
+                    public void onError(Throwable e) {
+                      f.setException(e);
+                    }
+
+                    @Override
+                    public void onCompleted() {
+                      ReadRowsRequest nextNextRequest = null;
+
+                      // When requested rows < limit, the current request will 
be the last
+                      if (byteLimitReached || rows.size() == 
nextRequest.getRowsLimit()) {
+                        nextNextRequest =
+                            truncateRequest(nextRequest, rows.get(rows.size() 
- 1).getKey());
+                      }
+                      f.set(new UpstreamResults(rows, nextNextRequest));
+                    }
+                  });
+      atomicScanHandler.set(handler);
+      return f;
+    }
+
+    private void waitReadRowsFuture() throws IOException {
+      try {
+        UpstreamResults r = future.get();
+        buffer.addAll(r.rows);
+        nextRequest = r.nextRequest;
+        future = null;
+        serviceCallMetric.call("ok");
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException(e);
+      } catch (ExecutionException e) {
+        throw new IOException(e.getCause());
+      }
+      catch (Exception e) {
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+          throw new IOException(e);
+        }
+        if (e.getCause() instanceof StatusRuntimeException) {
+          throw new IOException(e);
+        }
+      }

Review Comment:
   This looks unfinished:
   - you are checking InterruptedException twice
   - you lost the metric increment
   
   I think you want something like:
   ```
   try {
   ...
   } catch (InterruptedException e) {
     Thread.currentThread().interrupt();
     throw new IOException(e);
   } catch(ExecutionException e) {
     Throwable cause = e.getCause();
     if (cause instanceof StatusRuntimeException) {
       // increment error counts
     }
     throw new IOException(cause);
   } 
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +212,224 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableSegmentReaderImpl implements Reader {
+    private BigtableSession session;
+
+    @Nullable private ReadRowsRequest nextRequest;
+    private final Queue<Row> buffer;
+    private final long refillSegmentWaterMark;
+    private final long maxSegmentByteSize;
+    private Future<UpstreamResults> future;
+    private Row currentRow;
+    private static ServiceCallMetric serviceCallMetric;
+
+    private static class UpstreamResults {
+      private final List<Row> rows;
+      private final @Nullable ReadRowsRequest nextRequest;
+
+      UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest nextRequest) {
+        this.rows = rows;
+        this.nextRequest = nextRequest;
+      }
+    }
+
+    public static BigtableSegmentReaderImpl create(BigtableSession session, 
BigtableSource source) {
+      RowSet set;
+      if (source.getRanges().isEmpty()) {
+        set = 
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
+      } else {
+        RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+        for (int i = 0; i < source.getRanges().size(); i++) {
+          rowRanges[i] =
+              RowRange.newBuilder()
+                  .setStartKeyClosed(
+                      
ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+                  .setEndKeyOpen(
+                      
ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+                  .build();
+        }
+        set =
+            RowSet.newBuilder()
+                
.addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+                .build();
+      }
+
+      RowFilter filter =
+          MoreObjects.firstNonNull(source.getRowFilter(), 
RowFilter.getDefaultInstance());
+      ReadRowsRequest request =
+          ReadRowsRequest.newBuilder()
+              .setTableName(
+                  
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get()))
+              .setRows(set)
+              .setFilter(filter)
+              .setRowsLimit(source.getMaxBufferElementCount())
+              .build();
+
+      serviceCallMetric = populateReaderCallMetric(session, 
source.getTableId().get());
+
+      long maxSegmentByteSize =
+          (long)(Runtime.getRuntime().totalMemory()
+              * DEFAULT_BYTE_LIMIT_PERCENTAGE);
+
+      return new BigtableSegmentReaderImpl(session, request, 
maxSegmentByteSize);
+    }
+
+    @VisibleForTesting
+    BigtableSegmentReaderImpl(BigtableSession session, ReadRowsRequest 
request, long maxSegmentByteSize) {
+      this.session = session;
+      this.nextRequest = request;
+      this.maxSegmentByteSize = maxSegmentByteSize;
+      this.buffer = new ArrayDeque<>();
+      // Asynchronously refill buffer when there is 10% of the elements are 
left
+      this.refillSegmentWaterMark = request.getRowsLimit() / 10;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      future = fetchNextSegment();
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      System.out.println("HI");
+      if (buffer.size() < refillSegmentWaterMark && future == null ) {
+        future = fetchNextSegment();
+      }
+      if (buffer.isEmpty() && future != null) {
+        waitReadRowsFuture();
+      }
+      currentRow = buffer.poll();
+      return currentRow != null;
+    }
+
+    private Future<UpstreamResults> fetchNextSegment() {
+      SettableFuture<UpstreamResults> f = SettableFuture.create();
+      // When the nextRequest is null, the last fill completed and the buffer 
contains the last rows
+      if (nextRequest == null) {
+        f.set(new UpstreamResults(ImmutableList.of(), null));
+        return f;
+      }
+
+      // TODO(diegomez): Remove atomic ScanHandler for simpler 
StreamObserver/Future implementation
+      AtomicReference<ScanHandler> atomicScanHandler = new AtomicReference<>();
+      ScanHandler handler =
+          session
+              .getDataClient()
+              .readFlatRows(
+                  nextRequest,
+                  new StreamObserver<FlatRow>() {
+                    List<Row> rows = new ArrayList<>();
+                    long currentByteSize = 0;
+                    boolean byteLimitReached = false;
+
+                    @Override
+                    public void onNext(FlatRow flatRow) {
+                      Row row = FlatRowConverter.convert(flatRow);
+                      currentByteSize += row.getSerializedSize();
+                      rows.add(row);
+
+                      if (currentByteSize > maxSegmentByteSize) {
+                        byteLimitReached = true;
+                        atomicScanHandler.get().cancel();
+                        return;
+                      }
+                    }
+
+                    @Override
+                    public void onError(Throwable e) {
+                      f.setException(e);
+                    }
+
+                    @Override
+                    public void onCompleted() {
+                      ReadRowsRequest nextNextRequest = null;
+
+                      // When requested rows < limit, the current request will 
be the last
+                      if (byteLimitReached || rows.size() == 
nextRequest.getRowsLimit()) {
+                        nextNextRequest =
+                            truncateRequest(nextRequest, rows.get(rows.size() 
- 1).getKey());
+                      }
+                      f.set(new UpstreamResults(rows, nextNextRequest));
+                    }
+                  });
+      atomicScanHandler.set(handler);
+      return f;
+    }
+
+    private void waitReadRowsFuture() throws IOException {
+      try {
+        UpstreamResults r = future.get();
+        buffer.addAll(r.rows);
+        nextRequest = r.nextRequest;
+        future = null;
+        serviceCallMetric.call("ok");
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException(e);
+      } catch (ExecutionException e) {
+        throw new IOException(e.getCause());
+      }
+      catch (Exception e) {
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+          throw new IOException(e);
+        }
+        if (e.getCause() instanceof StatusRuntimeException) {
+          throw new IOException(e);
+        }
+      }
+    }
+
+    private ReadRowsRequest truncateRequest(ReadRowsRequest request, 
ByteString lastKey) {
+      RowSet.Builder segment = RowSet.newBuilder();
+
+      for (RowRange rowRange : request.getRows().getRowRangesList()) {
+        int startCmp = StartPoint.extract(rowRange).compareTo(new 
StartPoint(lastKey, true));
+        int endCmp = EndPoint.extract(rowRange).compareTo(new 
EndPoint(lastKey, true));
+
+        if (startCmp > 0) {
+          // If the startKey is passed the split point than add the whole range
+          segment.addRowRanges(rowRange);
+        } else if (endCmp > 0) {
+          // Row is split, remove all read rowKeys and split RowSet at last 
buffered Row
+          RowRange subRange = 
rowRange.toBuilder().setStartKeyOpen(lastKey).build();
+          segment.addRowRanges(subRange);
+        }
+      }
+      if (segment.getRowRangesCount() == 0) {
+        return null;
+      }
+      ReadRowsRequest newRequest =

Review Comment:
   please reuse the original request:
   
   ```
   RowSet oldRowSet = request.getRowSet();
   ReadRowsRequest.Builder requestBuilder = request.toBuilder();
   requestBuilder.clearRowSet();
   
   RowSet.Builder newRowset = RowSet.newBuilder();
   for(...) {...}
   requestBuilder.setRowSet(newRowset)
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to