igorbernstein2 commented on code in PR #16939:
URL: https://github.com/apache/beam/pull/16939#discussion_r870643035


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +212,254 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableSegmentReaderImpl implements Reader {
+    private BigtableSession session;
+
+    @Nullable ReadRowsRequest nextRequest;
+    final Queue<Row> buffer = new ArrayDeque<>();
+    final int segmentSize;
+    final int refillSegmentWaterMark;
+    final String tableId;
+    final RowFilter filter;
+    final long bufferByteLimit;
+    private Future<UpstreamResults> future;
+    private Row currentRow;
+    private final ServiceCallMetric serviceCallMetric;
+    // private boolean upstreamResourcesExhausted;
+
+    static class UpstreamResults {
+      final List<Row> rows;
+      final @Nullable ReadRowsRequest nextRequest;
+
+      UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest nextRequest) {
+        this.rows = rows;
+        this.nextRequest = nextRequest;
+      }
+    }
+
+    static BigtableSegmentReaderImpl create(BigtableSession session, 
BigtableSource source) {
+      RowSet set;
+      if (source.getRanges().isEmpty()) {
+        set = 
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
+      } else {
+        RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+        for (int i = 0; i < source.getRanges().size(); i++) {
+          rowRanges[i] =
+              RowRange.newBuilder()
+                  .setStartKeyClosed(
+                      
ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+                  .setEndKeyOpen(
+                      
ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+                  .build();
+        }
+        // Presort the ranges so that fetch future segment can exit early when 
splitting the row set
+        Arrays.sort(rowRanges, RANGE_START_COMPARATOR);
+        set =
+            RowSet.newBuilder()
+                
.addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+                .build();
+      }
+
+      RowFilter filter =
+          MoreObjects.firstNonNull(source.getRowFilter(), 
RowFilter.getDefaultInstance());
+      ReadRowsRequest request =
+          ReadRowsRequest.newBuilder()
+              .setTableName(
+                  
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get()))
+              .setRows(set)
+              .setFilter(filter)
+              .setRowsLimit(source.getMaxBufferElementCount())
+              .build();
+      return new BigtableSegmentReaderImpl(
+          session, request, source.getTableId().get(), filter, 
source.getMaxBufferElementCount());
+    }
+
+    @VisibleForTesting
+    BigtableSegmentReaderImpl(
+        BigtableSession session,
+        ReadRowsRequest request,
+        String tableId,
+        RowFilter filter,
+        int segmentSize) {

Review Comment:
   all of these are already present in ReadRowsRequest?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to