igorbernstein2 commented on code in PR #16939:
URL: https://github.com/apache/beam/pull/16939#discussion_r865135794
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -1262,6 +1288,11 @@ public List<ByteKeyRange> getRanges() {
return rowFilter != null && rowFilter.isAccessible() ? rowFilter.get() :
null;
}
+ public @Nullable Integer getMaxBufferElementCount() {
+ Integer bufferLimit = readOptions.getMaxBufferElementCount();
+ return bufferLimit != null ? bufferLimit : null;
Review Comment:
`return readOptions.getMaxBufferElementCount()`
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -402,6 +404,30 @@ public Read withRowFilter(RowFilter filter) {
return withRowFilter(StaticValueProvider.of(filter));
}
+ /**
+ * Returns a new {@link BigtableIO.Read} that will break up read requests
into smaller batches.
+ * This function will switch the base BigtableIO.Reader class to using the
SegmentReader
+ *
+ * <p>Does not modify this object.
+ *
+ * <p>When we have a builder, we initialize the value. When they call the
method then we
+ * override the value
+ */
+ @Experimental(Kind.SOURCE_SINK)
+ public Read withMaxBufferElementCount(Integer maxBufferElementCount) {
+ System.out.println(maxBufferElementCount);
Review Comment:
please remove printlns
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -402,6 +404,30 @@ public Read withRowFilter(RowFilter filter) {
return withRowFilter(StaticValueProvider.of(filter));
}
+ /**
+ * Returns a new {@link BigtableIO.Read} that will break up read requests
into smaller batches.
+ * This function will switch the base BigtableIO.Reader class to using the
SegmentReader
+ *
+ * <p>Does not modify this object.
+ *
+ * <p>When we have a builder, we initialize the value. When they call the
method then we
+ * override the value
+ */
+ @Experimental(Kind.SOURCE_SINK)
+ public Read withMaxBufferElementCount(Integer maxBufferElementCount) {
Review Comment:
This is a bit confusing:
Can the user disable this behavior? if so, how do they do it? Looking at the
function signature I would expect it to be by passing null because you are
accepting a boxed Integer instead of a primitive int. But then in the body I
see a non-null precondition check.
Please update the javadoc to explain how this behavior is disabled and make
the impl consistent.
I think the most consistent change would be to:
- document that null will use the streaming behavior
- annotate arg with `@Nullable`
- drop the not-null check
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +231,255 @@ public Row getCurrentRow() throws NoSuchElementException {
}
}
+ @VisibleForTesting
+ static class BigtableSegmentReaderImpl implements Reader {
+ private BigtableSession session;
+ private final BigtableSource source;
+ private Row currentRow;
+ private Queue<FlatRow> buffer;
+ private RowSet rowSet;
+ private ServiceCallMetric serviceCallMetric;
+ private Future<ImmutablePair<List<FlatRow>, Boolean>> future;
+ private ByteString lastFetchedRow;
+ private boolean lastFillComplete;
+ private boolean byteLimitReached;
+
+ private final int segmentLimit;
+ private final int segmentWaterMark;
+ private final String tableName;
+
+ @VisibleForTesting
+ BigtableSegmentReaderImpl(BigtableSession session, BigtableSource source) {
+ this.session = session;
+ if (source.getMaxBufferElementCount() != null &&
source.getMaxBufferElementCount() != 0) {
+ this.segmentLimit = source.getMaxBufferElementCount();
+ } else {
+ this.segmentLimit = DEFAULT_SEGMENT_SIZE;
+ }
+ // Asynchronously refill buffer when there is 10% of the elements are
left
+ this.segmentWaterMark = segmentLimit / 10;
Review Comment:
refillLowWatermark
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java:
##########
@@ -53,11 +56,17 @@ abstract static class Builder {
abstract Builder setRowFilter(ValueProvider<RowFilter> rowFilter);
+ abstract Builder setMaxBufferElementCount(Integer maxBufferElementCount);
Review Comment:
Annotate the arg with `@Nullable`
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +231,255 @@ public Row getCurrentRow() throws NoSuchElementException {
}
}
+ @VisibleForTesting
+ static class BigtableSegmentReaderImpl implements Reader {
+ private BigtableSession session;
+ private final BigtableSource source;
+ private Row currentRow;
+ private Queue<FlatRow> buffer;
+ private RowSet rowSet;
+ private ServiceCallMetric serviceCallMetric;
+ private Future<ImmutablePair<List<FlatRow>, Boolean>> future;
+ private ByteString lastFetchedRow;
+ private boolean lastFillComplete;
+ private boolean byteLimitReached;
+
+ private final int segmentLimit;
+ private final int segmentWaterMark;
+ private final String tableName;
+
+ @VisibleForTesting
+ BigtableSegmentReaderImpl(BigtableSession session, BigtableSource source) {
+ this.session = session;
+ if (source.getMaxBufferElementCount() != null &&
source.getMaxBufferElementCount() != 0) {
+ this.segmentLimit = source.getMaxBufferElementCount();
+ } else {
+ this.segmentLimit = DEFAULT_SEGMENT_SIZE;
+ }
+ // Asynchronously refill buffer when there is 10% of the elements are
left
+ this.segmentWaterMark = segmentLimit / 10;
+ tableName =
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());
+ this.source = source;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ buffer = new ArrayDeque<>();
+ lastFillComplete = false;
+ byteLimitReached = false;
+ RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+ for (int i = 0; i < source.getRanges().size(); i++) {
+ rowRanges[i] =
+ RowRange.newBuilder()
+ .setStartKeyClosed(
+
ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+ .setEndKeyOpen(
+
ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+ .build();
+ }
+ // Presort the ranges so that future segmentation can exit early when
splitting the row set
+ Arrays.sort(rowRanges, RANGE_START_COMPARATOR);
+ rowSet =
+ RowSet.newBuilder()
+
.addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+ .build();
+
+ HashMap<String, String> baseLabels = new HashMap<>();
+ baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+ baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+ baseLabels.put(MonitoringInfoConstants.Labels.METHOD,
"google.bigtable.v2.ReadRows");
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.RESOURCE,
+ GcpResourceIdentifiers.bigtableResource(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ source.getTableId().get()));
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID,
session.getOptions().getProjectId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.INSTANCE_ID,
session.getOptions().getInstanceId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.TABLE_ID,
+ GcpResourceIdentifiers.bigtableTableID(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ source.getTableId().get()));
+ serviceCallMetric =
+ new
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+
+ future = startNextSegmentRead();
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (buffer.size() <= segmentWaterMark && future == null &&
!lastFillComplete) {
+ if (!splitRowSet(lastFetchedRow)) {
+ return false;
+ }
+ future = startNextSegmentRead();
+ }
+ if (buffer.isEmpty()) {
+ if (future == null || lastFillComplete) {
+ return false;
+ }
+ waitReadRowsFuture();
+ }
+ // If the last fill is equal to row limit, the lastFillComplete flag
will not be true
+ // until another RPC is called which will return 0 rows
+ if (buffer.isEmpty() && lastFillComplete) {
+ return false;
+ }
+ currentRow = FlatRowConverter.convert(buffer.remove());
+ return currentRow != null;
+ }
+
+ private SettableFuture<ImmutablePair<List<FlatRow>, Boolean>>
startNextSegmentRead() {
+ SettableFuture<ImmutablePair<List<FlatRow>, Boolean>> f =
SettableFuture.create();
+ long availableMemory =
+ Runtime.getRuntime().maxMemory()
+ - (Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory());
+ long bufferByteLimit = (long) (DEFAULT_BYTE_LIMIT_PERCENTAGE *
availableMemory);
+ // TODO(diegomez): Remove atomic ScanHandler for simpler
StreamObserver/Future implementation
+ AtomicReference<ScanHandler> atomic = new AtomicReference<>();
+ ScanHandler handler;
+ handler =
Review Comment:
ScanHandler handler = session...readFlatRows()...
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +231,255 @@ public Row getCurrentRow() throws NoSuchElementException {
}
}
+ @VisibleForTesting
+ static class BigtableSegmentReaderImpl implements Reader {
+ private BigtableSession session;
+ private final BigtableSource source;
+ private Row currentRow;
+ private Queue<FlatRow> buffer;
+ private RowSet rowSet;
+ private ServiceCallMetric serviceCallMetric;
+ private Future<ImmutablePair<List<FlatRow>, Boolean>> future;
+ private ByteString lastFetchedRow;
+ private boolean lastFillComplete;
+ private boolean byteLimitReached;
+
+ private final int segmentLimit;
+ private final int segmentWaterMark;
+ private final String tableName;
+
+ @VisibleForTesting
+ BigtableSegmentReaderImpl(BigtableSession session, BigtableSource source) {
+ this.session = session;
+ if (source.getMaxBufferElementCount() != null &&
source.getMaxBufferElementCount() != 0) {
+ this.segmentLimit = source.getMaxBufferElementCount();
+ } else {
+ this.segmentLimit = DEFAULT_SEGMENT_SIZE;
+ }
+ // Asynchronously refill buffer when there is 10% of the elements are
left
+ this.segmentWaterMark = segmentLimit / 10;
+ tableName =
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());
+ this.source = source;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ buffer = new ArrayDeque<>();
+ lastFillComplete = false;
+ byteLimitReached = false;
+ RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+ for (int i = 0; i < source.getRanges().size(); i++) {
+ rowRanges[i] =
+ RowRange.newBuilder()
+ .setStartKeyClosed(
+
ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+ .setEndKeyOpen(
+
ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+ .build();
+ }
+ // Presort the ranges so that future segmentation can exit early when
splitting the row set
+ Arrays.sort(rowRanges, RANGE_START_COMPARATOR);
+ rowSet =
+ RowSet.newBuilder()
+
.addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+ .build();
+
+ HashMap<String, String> baseLabels = new HashMap<>();
+ baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+ baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+ baseLabels.put(MonitoringInfoConstants.Labels.METHOD,
"google.bigtable.v2.ReadRows");
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.RESOURCE,
+ GcpResourceIdentifiers.bigtableResource(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ source.getTableId().get()));
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID,
session.getOptions().getProjectId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.INSTANCE_ID,
session.getOptions().getInstanceId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.TABLE_ID,
+ GcpResourceIdentifiers.bigtableTableID(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ source.getTableId().get()));
+ serviceCallMetric =
+ new
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+
+ future = startNextSegmentRead();
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (buffer.size() <= segmentWaterMark && future == null &&
!lastFillComplete) {
+ if (!splitRowSet(lastFetchedRow)) {
+ return false;
+ }
+ future = startNextSegmentRead();
+ }
+ if (buffer.isEmpty()) {
+ if (future == null || lastFillComplete) {
+ return false;
+ }
+ waitReadRowsFuture();
+ }
+ // If the last fill is equal to row limit, the lastFillComplete flag
will not be true
+ // until another RPC is called which will return 0 rows
+ if (buffer.isEmpty() && lastFillComplete) {
+ return false;
+ }
+ currentRow = FlatRowConverter.convert(buffer.remove());
+ return currentRow != null;
+ }
+
+ private SettableFuture<ImmutablePair<List<FlatRow>, Boolean>>
startNextSegmentRead() {
+ SettableFuture<ImmutablePair<List<FlatRow>, Boolean>> f =
SettableFuture.create();
+ long availableMemory =
+ Runtime.getRuntime().maxMemory()
+ - (Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory());
+ long bufferByteLimit = (long) (DEFAULT_BYTE_LIMIT_PERCENTAGE *
availableMemory);
+ // TODO(diegomez): Remove atomic ScanHandler for simpler
StreamObserver/Future implementation
+ AtomicReference<ScanHandler> atomic = new AtomicReference<>();
+ ScanHandler handler;
+ handler =
+ session
+ .getDataClient()
+ .readFlatRows(
+ buildReadRowsRequest(),
+ new StreamObserver<FlatRow>() {
+ List<FlatRow> rows = new ArrayList<>();
+ long currentByteSize = 0;
+ boolean byteLimitReached = false;
+
+ @Override
+ public void onNext(FlatRow flatRow) {
+ rows.add(flatRow);
+ currentByteSize +=
+ flatRow.getRowKey().size()
+ + flatRow.getCells().stream()
+ .mapToLong(c -> c.getQualifier().size() +
c.getValue().size())
+ .sum();
Review Comment:
please extract this is to a helper method `computeRowSize(FlatRow)`
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java:
##########
@@ -80,6 +89,11 @@ void validate() {
if (getRowFilter() != null && getRowFilter().isAccessible()) {
checkArgument(getRowFilter().get() != null, "rowFilter can not be null");
}
+ if (getMaxBufferElementCount() != null) {
+ checkArgument(getMaxBufferElementCount() != null, "maxBufferElementCount
can not be null");
Review Comment:
The if statement already ensures that the count is not null, you dont need
the checkArgument
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java:
##########
@@ -53,11 +56,17 @@ abstract static class Builder {
abstract Builder setRowFilter(ValueProvider<RowFilter> rowFilter);
+ abstract Builder setMaxBufferElementCount(Integer maxBufferElementCount);
+
abstract Builder setKeyRanges(ValueProvider<List<ByteKeyRange>> keyRanges);
abstract BigtableReadOptions build();
}
+ BigtableReadOptions setMaxBufferElementCount(Integer maxBufferElementCount) {
Review Comment:
mark arg as Nullable
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +231,255 @@ public Row getCurrentRow() throws NoSuchElementException {
}
}
+ @VisibleForTesting
+ static class BigtableSegmentReaderImpl implements Reader {
+ private BigtableSession session;
+ private final BigtableSource source;
+ private Row currentRow;
+ private Queue<FlatRow> buffer;
Review Comment:
make this final and initialize it to an empty ArrayDeque in the constructor
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -402,6 +404,30 @@ public Read withRowFilter(RowFilter filter) {
return withRowFilter(StaticValueProvider.of(filter));
}
+ /**
+ * Returns a new {@link BigtableIO.Read} that will break up read requests
into smaller batches.
+ * This function will switch the base BigtableIO.Reader class to using the
SegmentReader
+ *
+ * <p>Does not modify this object.
+ *
+ * <p>When we have a builder, we initialize the value. When they call the
method then we
+ * override the value
+ */
+ @Experimental(Kind.SOURCE_SINK)
+ public Read withMaxBufferElementCount(Integer maxBufferElementCount) {
+ System.out.println(maxBufferElementCount);
+ checkArgument(maxBufferElementCount != null, "maxBufferElementCount can
not be null");
+ checkArgument(maxBufferElementCount > 0, "maxBufferElementCount can not
be zero or negative");
Review Comment:
the ReadOptions already has this check, so you can remove the check here
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -402,6 +404,30 @@ public Read withRowFilter(RowFilter filter) {
return withRowFilter(StaticValueProvider.of(filter));
}
+ /**
+ * Returns a new {@link BigtableIO.Read} that will break up read requests
into smaller batches.
+ * This function will switch the base BigtableIO.Reader class to using the
SegmentReader
+ *
+ * <p>Does not modify this object.
+ *
+ * <p>When we have a builder, we initialize the value. When they call the
method then we
+ * override the value
+ */
+ @Experimental(Kind.SOURCE_SINK)
+ public Read withMaxBufferElementCount(Integer maxBufferElementCount) {
+ System.out.println(maxBufferElementCount);
+ checkArgument(maxBufferElementCount != null, "maxBufferElementCount can
not be null");
Review Comment:
checkNotNull
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +231,255 @@ public Row getCurrentRow() throws NoSuchElementException {
}
}
+ @VisibleForTesting
+ static class BigtableSegmentReaderImpl implements Reader {
+ private BigtableSession session;
+ private final BigtableSource source;
+ private Row currentRow;
+ private Queue<FlatRow> buffer;
+ private RowSet rowSet;
+ private ServiceCallMetric serviceCallMetric;
+ private Future<ImmutablePair<List<FlatRow>, Boolean>> future;
+ private ByteString lastFetchedRow;
+ private boolean lastFillComplete;
+ private boolean byteLimitReached;
+
+ private final int segmentLimit;
+ private final int segmentWaterMark;
+ private final String tableName;
+
+ @VisibleForTesting
+ BigtableSegmentReaderImpl(BigtableSession session, BigtableSource source) {
+ this.session = session;
+ if (source.getMaxBufferElementCount() != null &&
source.getMaxBufferElementCount() != 0) {
+ this.segmentLimit = source.getMaxBufferElementCount();
+ } else {
+ this.segmentLimit = DEFAULT_SEGMENT_SIZE;
+ }
+ // Asynchronously refill buffer when there is 10% of the elements are
left
+ this.segmentWaterMark = segmentLimit / 10;
+ tableName =
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());
+ this.source = source;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ buffer = new ArrayDeque<>();
+ lastFillComplete = false;
+ byteLimitReached = false;
+ RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+ for (int i = 0; i < source.getRanges().size(); i++) {
+ rowRanges[i] =
+ RowRange.newBuilder()
+ .setStartKeyClosed(
+
ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+ .setEndKeyOpen(
+
ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+ .build();
+ }
+ // Presort the ranges so that future segmentation can exit early when
splitting the row set
+ Arrays.sort(rowRanges, RANGE_START_COMPARATOR);
+ rowSet =
+ RowSet.newBuilder()
+
.addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+ .build();
+
+ HashMap<String, String> baseLabels = new HashMap<>();
+ baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+ baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+ baseLabels.put(MonitoringInfoConstants.Labels.METHOD,
"google.bigtable.v2.ReadRows");
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.RESOURCE,
+ GcpResourceIdentifiers.bigtableResource(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ source.getTableId().get()));
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID,
session.getOptions().getProjectId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.INSTANCE_ID,
session.getOptions().getInstanceId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.TABLE_ID,
+ GcpResourceIdentifiers.bigtableTableID(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ source.getTableId().get()));
+ serviceCallMetric =
+ new
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+
+ future = startNextSegmentRead();
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (buffer.size() <= segmentWaterMark && future == null &&
!lastFillComplete) {
+ if (!splitRowSet(lastFetchedRow)) {
+ return false;
+ }
+ future = startNextSegmentRead();
+ }
+ if (buffer.isEmpty()) {
+ if (future == null || lastFillComplete) {
+ return false;
+ }
+ waitReadRowsFuture();
+ }
+ // If the last fill is equal to row limit, the lastFillComplete flag
will not be true
+ // until another RPC is called which will return 0 rows
+ if (buffer.isEmpty() && lastFillComplete) {
+ return false;
+ }
+ currentRow = FlatRowConverter.convert(buffer.remove());
+ return currentRow != null;
+ }
+
+ private SettableFuture<ImmutablePair<List<FlatRow>, Boolean>>
startNextSegmentRead() {
+ SettableFuture<ImmutablePair<List<FlatRow>, Boolean>> f =
SettableFuture.create();
+ long availableMemory =
+ Runtime.getRuntime().maxMemory()
+ - (Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory());
+ long bufferByteLimit = (long) (DEFAULT_BYTE_LIMIT_PERCENTAGE *
availableMemory);
Review Comment:
I think you just simplify this to 10% of the maxMemory and move it to the
constructor
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +231,255 @@ public Row getCurrentRow() throws NoSuchElementException {
}
}
+ @VisibleForTesting
+ static class BigtableSegmentReaderImpl implements Reader {
+ private BigtableSession session;
Review Comment:
final
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +231,255 @@ public Row getCurrentRow() throws NoSuchElementException {
}
}
+ @VisibleForTesting
+ static class BigtableSegmentReaderImpl implements Reader {
+ private BigtableSession session;
+ private final BigtableSource source;
+ private Row currentRow;
+ private Queue<FlatRow> buffer;
+ private RowSet rowSet;
+ private ServiceCallMetric serviceCallMetric;
+ private Future<ImmutablePair<List<FlatRow>, Boolean>> future;
+ private ByteString lastFetchedRow;
+ private boolean lastFillComplete;
+ private boolean byteLimitReached;
+
+ private final int segmentLimit;
+ private final int segmentWaterMark;
+ private final String tableName;
Review Comment:
final
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +231,255 @@ public Row getCurrentRow() throws NoSuchElementException {
}
}
+ @VisibleForTesting
+ static class BigtableSegmentReaderImpl implements Reader {
+ private BigtableSession session;
+ private final BigtableSource source;
Review Comment:
I really don't like storing the entire source here...it exposes too much
surface to the BigtableSegmentReaderImpl. I think it would be cleaner to have
the constructor take 3 args: BigtableSession, ReadRowsRequest and the
segmentSize. To try to maintain some consistency with the old impl, introduce a
factory method that extract the relevant parts from the source:
```java
class BigtableSegmentReaderImpl {
static BigtableSegmentReaderImpl create(BigtableSession, BigtableSource) {
ReadRowsRequest request = ReadRowsRequest.builder()
.setTableName(session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get()))
.setRows(RowSet.builder().addAllRowRanges(source.getRanges().sorted(RANGE_START_COMPARATOR).collect(Collectors.toList())))
.setRowFilter(MoreObjects.firstNonNull(source.getRowFilter(),
RowFilter.defaultInstance())
.build()
return new BigtableSegmentReaderImpl(session, request,
source.getMaxBufferElementCount));
}
BigtableSegmentReaderImpl(BigtableSession, ReadRowsRequest, segmentSize) {
//...
}
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +231,255 @@ public Row getCurrentRow() throws NoSuchElementException {
}
}
+ @VisibleForTesting
+ static class BigtableSegmentReaderImpl implements Reader {
+ private BigtableSession session;
+ private final BigtableSource source;
+ private Row currentRow;
+ private Queue<FlatRow> buffer;
+ private RowSet rowSet;
+ private ServiceCallMetric serviceCallMetric;
Review Comment:
make this final and construct it in the constructor
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +231,255 @@ public Row getCurrentRow() throws NoSuchElementException {
}
}
+ @VisibleForTesting
+ static class BigtableSegmentReaderImpl implements Reader {
+ private BigtableSession session;
+ private final BigtableSource source;
+ private Row currentRow;
+ private Queue<FlatRow> buffer;
+ private RowSet rowSet;
+ private ServiceCallMetric serviceCallMetric;
+ private Future<ImmutablePair<List<FlatRow>, Boolean>> future;
+ private ByteString lastFetchedRow;
+ private boolean lastFillComplete;
+ private boolean byteLimitReached;
+
+ private final int segmentLimit;
+ private final int segmentWaterMark;
+ private final String tableName;
+
+ @VisibleForTesting
+ BigtableSegmentReaderImpl(BigtableSession session, BigtableSource source) {
+ this.session = session;
+ if (source.getMaxBufferElementCount() != null &&
source.getMaxBufferElementCount() != 0) {
+ this.segmentLimit = source.getMaxBufferElementCount();
+ } else {
+ this.segmentLimit = DEFAULT_SEGMENT_SIZE;
+ }
+ // Asynchronously refill buffer when there is 10% of the elements are
left
+ this.segmentWaterMark = segmentLimit / 10;
+ tableName =
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());
+ this.source = source;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ buffer = new ArrayDeque<>();
+ lastFillComplete = false;
+ byteLimitReached = false;
+ RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+ for (int i = 0; i < source.getRanges().size(); i++) {
+ rowRanges[i] =
+ RowRange.newBuilder()
+ .setStartKeyClosed(
+
ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+ .setEndKeyOpen(
+
ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+ .build();
+ }
+ // Presort the ranges so that future segmentation can exit early when
splitting the row set
+ Arrays.sort(rowRanges, RANGE_START_COMPARATOR);
+ rowSet =
+ RowSet.newBuilder()
+
.addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+ .build();
+
+ HashMap<String, String> baseLabels = new HashMap<>();
+ baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+ baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+ baseLabels.put(MonitoringInfoConstants.Labels.METHOD,
"google.bigtable.v2.ReadRows");
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.RESOURCE,
+ GcpResourceIdentifiers.bigtableResource(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ source.getTableId().get()));
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID,
session.getOptions().getProjectId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.INSTANCE_ID,
session.getOptions().getInstanceId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.TABLE_ID,
+ GcpResourceIdentifiers.bigtableTableID(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ source.getTableId().get()));
+ serviceCallMetric =
Review Comment:
Please extract this to a private static helper method that takes a
BigtableSession and constructs the label map.
And invoke it in the constructor. From what I can tell, there is no reason
to wait until start to build this. Also make it final
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +231,255 @@ public Row getCurrentRow() throws NoSuchElementException {
}
}
+ @VisibleForTesting
+ static class BigtableSegmentReaderImpl implements Reader {
+ private BigtableSession session;
+ private final BigtableSource source;
+ private Row currentRow;
+ private Queue<FlatRow> buffer;
+ private RowSet rowSet;
+ private ServiceCallMetric serviceCallMetric;
+ private Future<ImmutablePair<List<FlatRow>, Boolean>> future;
+ private ByteString lastFetchedRow;
+ private boolean lastFillComplete;
+ private boolean byteLimitReached;
Review Comment:
I think you should collapse these input a single flag: `upstreamExhausted`
and only access it in `startNextSegmentRead`
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +231,255 @@ public Row getCurrentRow() throws NoSuchElementException {
}
}
+ @VisibleForTesting
+ static class BigtableSegmentReaderImpl implements Reader {
+ private BigtableSession session;
+ private final BigtableSource source;
+ private Row currentRow;
+ private Queue<FlatRow> buffer;
+ private RowSet rowSet;
+ private ServiceCallMetric serviceCallMetric;
+ private Future<ImmutablePair<List<FlatRow>, Boolean>> future;
+ private ByteString lastFetchedRow;
+ private boolean lastFillComplete;
+ private boolean byteLimitReached;
+
+ private final int segmentLimit;
+ private final int segmentWaterMark;
+ private final String tableName;
+
+ @VisibleForTesting
+ BigtableSegmentReaderImpl(BigtableSession session, BigtableSource source) {
+ this.session = session;
+ if (source.getMaxBufferElementCount() != null &&
source.getMaxBufferElementCount() != 0) {
+ this.segmentLimit = source.getMaxBufferElementCount();
+ } else {
+ this.segmentLimit = DEFAULT_SEGMENT_SIZE;
+ }
+ // Asynchronously refill buffer when there is 10% of the elements are
left
+ this.segmentWaterMark = segmentLimit / 10;
+ tableName =
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());
+ this.source = source;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ buffer = new ArrayDeque<>();
+ lastFillComplete = false;
+ byteLimitReached = false;
+ RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+ for (int i = 0; i < source.getRanges().size(); i++) {
+ rowRanges[i] =
+ RowRange.newBuilder()
+ .setStartKeyClosed(
+
ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+ .setEndKeyOpen(
+
ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+ .build();
+ }
+ // Presort the ranges so that future segmentation can exit early when
splitting the row set
+ Arrays.sort(rowRanges, RANGE_START_COMPARATOR);
+ rowSet =
+ RowSet.newBuilder()
+
.addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+ .build();
+
+ HashMap<String, String> baseLabels = new HashMap<>();
+ baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+ baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+ baseLabels.put(MonitoringInfoConstants.Labels.METHOD,
"google.bigtable.v2.ReadRows");
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.RESOURCE,
+ GcpResourceIdentifiers.bigtableResource(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ source.getTableId().get()));
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID,
session.getOptions().getProjectId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.INSTANCE_ID,
session.getOptions().getInstanceId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.TABLE_ID,
+ GcpResourceIdentifiers.bigtableTableID(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ source.getTableId().get()));
+ serviceCallMetric =
+ new
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+
+ future = startNextSegmentRead();
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (buffer.size() <= segmentWaterMark && future == null &&
!lastFillComplete) {
+ if (!splitRowSet(lastFetchedRow)) {
+ return false;
+ }
Review Comment:
push this down into startNextSegmentRead()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]