scwhittle commented on code in PR #33690:
URL: https://github.com/apache/beam/pull/33690#discussion_r1923386312


##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java:
##########
@@ -23,7 +23,6 @@
 import java.util.Map;
 import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
 import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
-import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;

Review Comment:
   separate change?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WeightedBoundedQueue.java:
##########
@@ -17,25 +17,29 @@
  */
 package org.apache.beam.runners.dataflow.worker.streaming;
 
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Queue bounded by a {@link WeightedSemaphore}. */
-public final class WeightedBoundedQueue<V> {
+public final class WeightedBoundedQueue<V extends @NonNull Object> {

Review Comment:
   we might want to make this less generally named or at least comment on 
spinning so we know that before it gets used somewhere else.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WeightedBoundedQueue.java:
##########
@@ -60,14 +64,29 @@ public void put(V value) {
    * Retrieves and removes the head of this queue, waiting up to the specified 
wait time if
    * necessary for an element to become available.
    *
-   * @param timeout how long to wait before giving up, in units of {@code unit}
-   * @param unit a {@code TimeUnit} determining how to interpret the {@code 
timeout} parameter
+   * @param timeout how long to wait before giving up
    * @return the head of this queue, or {@code null} if the specified waiting 
time elapses before an
    *     element is available
    * @throws InterruptedException if interrupted while waiting
    */
-  public @Nullable V poll(long timeout, TimeUnit unit) throws 
InterruptedException {
-    @Nullable V result = queue.poll(timeout, unit);
+  public @Nullable V poll(Duration timeout) throws InterruptedException {
+    @Nullable V result;
+    Instant deadline = Instant.now().plus(timeout);
+    int spin = 0;
+    while (true) {
+      if (++spin > 1000) {

Review Comment:
   how much cpu does this burn if the queue is empty?
   
   we could also look into other specialized queues like:
   
https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscBlockingConsumerArrayQueue.java
   (random googling, would need some validation).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to