gemini-code-assist[bot] commented on code in PR #39021: URL: https://github.com/apache/beam/pull/39021#discussion_r3437197193
########## sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/util/MovingSum.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.util; + +import java.util.Arrays; + +/** + * Class that keeps track of a rolling window sum. + * + * <p>For use in tracking recent performance of the connector. + * + * <p>Intended to be similar to {@link org.apache.beam.sdk.util.MovingFunction}, but for convenience + * we expose the count of entries as well so this doubles as a moving average tracker. + */ +public class MovingSum { + private final int numBuckets; + private final long bucketMs; + + private int currentIndex; + private long currentMsSinceEpoch; + private final long[] sums; + private final long[] counts; + + public MovingSum(long windowMs, long bucketMs) { + if (windowMs < bucketMs || bucketMs <= 0) { + throw new IllegalArgumentException("windowMs >= bucketMs > 0 please"); + } + this.numBuckets = (int) Math.ceil((double) windowMs / bucketMs); + this.bucketMs = bucketMs; + this.sums = new long[this.numBuckets]; + this.counts = new long[this.numBuckets]; + this.currentIndex = 0; + this.currentMsSinceEpoch = 0; + Arrays.fill(this.sums, 0L); + Arrays.fill(this.counts, 0L); + } + + private void reset(long now) { + this.currentIndex = 0; + this.currentMsSinceEpoch = (now / bucketMs) * bucketMs; + Arrays.fill(sums, 0L); + Arrays.fill(counts, 0L); + } + + private void flush(long now) { + if (now >= (currentMsSinceEpoch + bucketMs * numBuckets)) { + // Time moved forward so far that all currently held data is outside of + // the window. It is faster to simply reset our data. + reset(now); + return; + } + + while (now > currentMsSinceEpoch + bucketMs) { Review Comment:  The `while` loop condition uses `>` instead of `>=`. This causes a correctness bug when `now` is exactly on a bucket boundary (i.e., `now == currentMsSinceEpoch + bucketMs`).\n\nSpecifically, if `now` is exactly at the start of the next bucket, the loop condition `now > currentMsSinceEpoch + bucketMs` evaluates to `false`, so the bucket is not advanced and the data is written to the old bucket. On a subsequent call even 1 millisecond later, the loop will evaluate to `true`, advance the bucket, and overwrite the newly added data with `0`, causing data loss.\n\nPlease change the condition to `>=`. ```suggestion while (now >= currentMsSinceEpoch + bucketMs) { ``` ########## sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/throttling/ReactiveThrottler.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.throttling; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A wrapper around the AdaptiveThrottler that also handles logging and signaling throttling to the + * SDK harness using the provided namespace. + * + * <p>For usage, instantiate one instance of a ReactiveThrottler class for a PTransform. When making + * remote calls to a service, preface that call with the throttle() method to potentially + * pre-emptively throttle the request. This will throttle future calls based on the failure rate of + * preceding calls, with higher failure rates leading to longer periods of throttling to allow + * system recovery. capture the timestamp of the attempted request, then execute the request code. + * On a success, call successfulRequest(timestamp) to report the success to the throttler. + */ +public class ReactiveThrottler extends AdaptiveThrottler { + private static final Logger LOG = LoggerFactory.getLogger(ReactiveThrottler.class); + private static final long SECONDS_TO_MILLISECONDS = 1000L; + + private final ThrottlingSignaler throttlingSignaler; + private final int throttleDelaySecs; + + /** + * Initializes the ReactiveThrottler. + * + * @param windowMs length of history to consider, in ms, to set throttling. + * @param bucketMs granularity of time buckets that we store data in, in ms. + * @param overloadRatio the target ratio between requests sent and successful requests. + * @param namespace the namespace to use for logging and signaling throttling is occurring. + * @param throttleDelaySecs the amount of time in seconds to wait after preemptively throttled + * requests. + */ + public ReactiveThrottler( + long windowMs, long bucketMs, double overloadRatio, String namespace, int throttleDelaySecs) { + super(windowMs, bucketMs, overloadRatio); + this.throttlingSignaler = new ThrottlingSignaler(namespace); + this.throttleDelaySecs = throttleDelaySecs; + } + + /** + * Stops request code from advancing while the underlying AdaptiveThrottler is signaling to + * preemptively throttle the request. Automatically handles logging the throttling and signaling + * to the SDK harness that the request is being throttled. This should be called in any context + * where a call to a remote service is being contacted prior to the call being performed. + */ + public void throttle() throws InterruptedException { + while (throttleRequest(System.currentTimeMillis())) { + LOG.info("Delaying request for {} seconds due to previous failures", throttleDelaySecs); + Thread.sleep(throttleDelaySecs * SECONDS_TO_MILLISECONDS); + throttlingSignaler.signalThrottling(throttleDelaySecs * SECONDS_TO_MILLISECONDS); + } + } Review Comment:  The `while` loop in `throttle()` has no maximum limit or timeout. If a remote service experiences a prolonged outage, the throttling probability will approach 100%, and worker threads could become blocked in this loop indefinitely. This can lead to worker thread pool exhaustion, lease renewal failures, and pipeline stuckness.\n\nAdditionally, logging at `INFO` level for every throttled request can cause severe log spam in high-throughput pipelines.\n\nTo address both issues, consider changing the `while` loop to a single `if` check (delaying once per request to apply backpressure without infinite blocking) and changing the log level to `DEBUG`. ```suggestion public void throttle() throws InterruptedException { if (throttleRequest(System.currentTimeMillis())) { LOG.debug("Delaying request for {} seconds due to previous failures", throttleDelaySecs); Thread.sleep(throttleDelaySecs * SECONDS_TO_MILLISECONDS); throttlingSignaler.signalThrottling(throttleDelaySecs * SECONDS_TO_MILLISECONDS); } } ``` ########## sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/util/MovingSumTest.java: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class MovingSumTest { + + @Test + public void testMovingSumBasic() { + MovingSum movingSum = new MovingSum(10000, 1000); + assertFalse(movingSum.hasData(1000)); + + movingSum.add(1000, 5); + assertTrue(movingSum.hasData(1000)); + assertEquals(5, movingSum.sum(1000)); + assertEquals(1, movingSum.count(1000)); + + movingSum.add(1500, 10); + assertEquals(15, movingSum.sum(1500)); + assertEquals(2, movingSum.count(1500)); + + // Wait 11 seconds (moving completely outside window) Review Comment:  The current unit tests do not exercise the `while` loop in `flush()`. They only test adding within the same bucket or jumping completely outside the window (which triggers the `reset()` shortcut).\n\nPlease add a test case that advances the time by a few buckets (but within the window) to ensure the bucket advancement logic is thoroughly tested. ```java movingSum.add(1500, 10); assertEquals(15, movingSum.sum(1500)); assertEquals(2, movingSum.count(1500)); // Advance by 2 buckets (from 1000 to 3000) assertEquals(15, movingSum.sum(3000)); movingSum.add(3500, 20); assertEquals(35, movingSum.sum(3500)); assertEquals(3, movingSum.count(3500)); // Wait 11 seconds (moving completely outside window) ``` ########## sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/throttling/AdaptiveThrottler.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.throttling; + +import java.util.Random; +import org.apache.beam.sdk.io.components.util.MovingSum; + +/** + * Implements adaptive throttling. + * + * <p>See + * https://landing.google.com/sre/book/chapters/handling-overload.html#client-side-throttling-a7sYUg + * for a full discussion of the use case and algorithm applied. + */ +public class AdaptiveThrottler { + + // The target minimum number of requests per samplePeriodMs, even if no + // requests succeed. Must be greater than 0, else we could throttle to zero. + // Because every decision is probabilistic, there is no guarantee that the + // request rate in any given interval will not be zero. (This is the +1 from + // the formula in + // https://landing.google.com/sre/book/chapters/handling-overload.html ) + public static final int MIN_REQUESTS = 1; + + private final MovingSum allRequests; + private final MovingSum successfulRequests; + private final double overloadRatio; + private final Random random; + + /** + * Initializes AdaptiveThrottler. + * + * @param windowMs length of history to consider, in ms, to set throttling. + * @param bucketMs granularity of time buckets that we store data in, in ms. + * @param overloadRatio the target ratio between requests sent and successful requests. + */ + public AdaptiveThrottler(long windowMs, long bucketMs, double overloadRatio) { + this(windowMs, bucketMs, overloadRatio, new Random()); + } + + // visible for testing + AdaptiveThrottler(long windowMs, long bucketMs, double overloadRatio, Random random) { + this.allRequests = new MovingSum(windowMs, bucketMs); + this.successfulRequests = new MovingSum(windowMs, bucketMs); + this.overloadRatio = overloadRatio; + this.random = random; + } Review Comment:  Please add validation for `overloadRatio` in the constructor. If `overloadRatio` is less than or equal to 1.0, the throttler will preemptively throttle requests even when 100% of them are successful, which is highly undesirable. ```java AdaptiveThrottler(long windowMs, long bucketMs, double overloadRatio, Random random) { if (overloadRatio <= 1.0) { throw new IllegalArgumentException("overloadRatio must be greater than 1.0"); } this.allRequests = new MovingSum(windowMs, bucketMs); this.successfulRequests = new MovingSum(windowMs, bucketMs); this.overloadRatio = overloadRatio; this.random = random; } ``` ########## sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/throttling/AdaptiveThrottler.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.throttling; + +import java.util.Random; +import org.apache.beam.sdk.io.components.util.MovingSum; + +/** + * Implements adaptive throttling. + * + * <p>See + * https://landing.google.com/sre/book/chapters/handling-overload.html#client-side-throttling-a7sYUg + * for a full discussion of the use case and algorithm applied. + */ +public class AdaptiveThrottler { + + // The target minimum number of requests per samplePeriodMs, even if no + // requests succeed. Must be greater than 0, else we could throttle to zero. + // Because every decision is probabilistic, there is no guarantee that the + // request rate in any given interval will not be zero. (This is the +1 from + // the formula in + // https://landing.google.com/sre/book/chapters/handling-overload.html ) + public static final int MIN_REQUESTS = 1; + + private final MovingSum allRequests; + private final MovingSum successfulRequests; + private final double overloadRatio; + private final Random random; + + /** + * Initializes AdaptiveThrottler. + * + * @param windowMs length of history to consider, in ms, to set throttling. + * @param bucketMs granularity of time buckets that we store data in, in ms. + * @param overloadRatio the target ratio between requests sent and successful requests. + */ + public AdaptiveThrottler(long windowMs, long bucketMs, double overloadRatio) { + this(windowMs, bucketMs, overloadRatio, new Random()); + } + + // visible for testing + AdaptiveThrottler(long windowMs, long bucketMs, double overloadRatio, Random random) { + this.allRequests = new MovingSum(windowMs, bucketMs); + this.successfulRequests = new MovingSum(windowMs, bucketMs); + this.overloadRatio = overloadRatio; + this.random = random; + } + + protected double throttlingProbability(long now) { + if (!allRequests.hasData(now)) { + return 0.0; + } + long allReqs = allRequests.sum(now); + long successfulReqs = successfulRequests.sum(now); + double prob = (allReqs - overloadRatio * successfulReqs) / (allReqs + MIN_REQUESTS); + return Math.max(0.0, prob); + } + + /** + * Determines whether one RPC attempt should be throttled. + * + * <p>This should be called once each time the caller intends to send an RPC; if it returns true, + * drop or delay that request (calling this function again after the delay). + * + * @param now time in ms since the epoch + * @return true if the caller should throttle or delay the request. + */ + public boolean throttleRequest(long now) { + double prob = throttlingProbability(now); + allRequests.add(now, 1); + return random.nextDouble() < prob; + } + + /** + * Notifies the throttler of a successful request. + * + * <p>Must be called once for each request (for which throttleRequest was previously called) that + * succeeded. + * + * @param now time in ms since the epoch + */ + public void successfulRequest(long now) { + successfulRequests.add(now, 1); Review Comment:  In Apache Beam, components like throttlers are typically accessed concurrently by multiple worker threads processing elements in parallel. Currently, `AdaptiveThrottler` and `MovingSum` are not thread-safe. Concurrent calls to `throttleRequest` and `successfulRequest` will cause race conditions on `MovingSum`'s internal state, leading to data corruption, `ArrayIndexOutOfBoundsException`, or infinite loops in `flush()`.\n\nTo ensure thread safety, please synchronize the public methods of `AdaptiveThrottler`. Note that `ReactiveThrottler.throttle()` should *not* be synchronized as it contains a `Thread.sleep()`, but synchronizing `throttleRequest` and `successfulRequest` is safe and will not block other threads during the sleep. ```suggestion public synchronized boolean throttleRequest(long now) { double prob = throttlingProbability(now); allRequests.add(now, 1); return random.nextDouble() < prob; } /** * Notifies the throttler of a successful request. * * <p>Must be called once for each request (for which throttleRequest was previously called) that * succeeded. * * @param now time in ms since the epoch */ public synchronized void successfulRequest(long now) { successfulRequests.add(now, 1); ``` ########## sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/throttling/ReactiveThrottler.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.throttling; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A wrapper around the AdaptiveThrottler that also handles logging and signaling throttling to the + * SDK harness using the provided namespace. + * + * <p>For usage, instantiate one instance of a ReactiveThrottler class for a PTransform. When making + * remote calls to a service, preface that call with the throttle() method to potentially + * pre-emptively throttle the request. This will throttle future calls based on the failure rate of + * preceding calls, with higher failure rates leading to longer periods of throttling to allow + * system recovery. capture the timestamp of the attempted request, then execute the request code. + * On a success, call successfulRequest(timestamp) to report the success to the throttler. + */ +public class ReactiveThrottler extends AdaptiveThrottler { + private static final Logger LOG = LoggerFactory.getLogger(ReactiveThrottler.class); + private static final long SECONDS_TO_MILLISECONDS = 1000L; + + private final ThrottlingSignaler throttlingSignaler; + private final int throttleDelaySecs; + + /** + * Initializes the ReactiveThrottler. + * + * @param windowMs length of history to consider, in ms, to set throttling. + * @param bucketMs granularity of time buckets that we store data in, in ms. + * @param overloadRatio the target ratio between requests sent and successful requests. + * @param namespace the namespace to use for logging and signaling throttling is occurring. + * @param throttleDelaySecs the amount of time in seconds to wait after preemptively throttled + * requests. + */ + public ReactiveThrottler( + long windowMs, long bucketMs, double overloadRatio, String namespace, int throttleDelaySecs) { + super(windowMs, bucketMs, overloadRatio); + this.throttlingSignaler = new ThrottlingSignaler(namespace); + this.throttleDelaySecs = throttleDelaySecs; + } Review Comment:  Please add validation to ensure `throttleDelaySecs` is strictly greater than 0. If `throttleDelaySecs` is 0 or negative, the `while` loop in `throttle()` will result in a CPU-intensive busy-wait loop or throw an `IllegalArgumentException` in `Thread.sleep()`. ```java public ReactiveThrottler( long windowMs, long bucketMs, double overloadRatio, String namespace, int throttleDelaySecs) { super(windowMs, bucketMs, overloadRatio); if (throttleDelaySecs <= 0) { throw new IllegalArgumentException("throttleDelaySecs must be greater than 0"); } this.throttlingSignaler = new ThrottlingSignaler(namespace); this.throttleDelaySecs = throttleDelaySecs; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
