ahmedabu98 commented on code in PR #31924:
URL: https://github.com/apache/beam/pull/31924#discussion_r1690115352
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -1318,24 +1361,61 @@ public void startBundle(StartBundleContext c) throws
IOException {
badRecords = new HashSet<>();
}
+ private final Counter throttlingMsecs =
+ Metrics.counter(Metrics.THROTTLE_TIME_NAMESPACE,
Metrics.THROTTLE_TIME_COUNTER_NAME);
+
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) throws
Exception {
checkForFailures();
KV<ByteString, Iterable<Mutation>> record = c.element();
-
bigtableWriter.writeRecord(record).whenComplete(handleMutationException(record,
window));
+ Instant writeStart = Instant.now();
+ pendingThrottlingMsecs = 0;
+ bigtableWriter
+ .writeRecord(record)
+ .whenComplete(handleMutationException(record, window, writeStart));
+ if (pendingThrottlingMsecs > 0) {
+ throttlingMsecs.inc(pendingThrottlingMsecs);
+ }
++recordsWritten;
seenWindows.compute(window, (key, count) -> (count != null ? count : 0)
+ 1);
}
private BiConsumer<MutateRowResponse, Throwable> handleMutationException(
- KV<ByteString, Iterable<Mutation>> record, BoundedWindow window) {
+ KV<ByteString, Iterable<Mutation>> record, BoundedWindow window,
Instant writeStart) {
return (MutateRowResponse result, Throwable exception) -> {
if (exception != null) {
if (isDataException(exception)) {
retryIndividualRecord(record, window);
} else {
+ // Exception due to resource unavailable or rate limited,
+ // including DEADLINE_EXCEEDED and RESOURCE_EXHAUSTED.
+ boolean isResourceException = false;
+ if (exception instanceof StatusRuntimeException) {
+ StatusRuntimeException se = (StatusRuntimeException) exception;
+ if (io.grpc.Status.DEADLINE_EXCEEDED.equals(se.getStatus())
+ || io.grpc.Status.RESOURCE_EXHAUSTED.equals(se.getStatus()))
{
+ isResourceException = true;
+ }
+ } else if (exception instanceof DeadlineExceededException
+ || exception instanceof ResourceExhaustedException) {
+ isResourceException = true;
+ }
+ if (isResourceException) {
+ pendingThrottlingMsecs = new Duration(writeStart,
Instant.now()).getMillis();
+ }
failures.add(new BigtableWriteException(record, exception));
}
+ } else {
+ // add the excessive amount to throttling metrics if elapsed time >
target latency
+ if (writeOptions.getThrottlingReportTargetMs() != null
+ && writeOptions.getThrottlingReportTargetMs() > 0) {
Review Comment:
Yep, that sounds reasonable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]