This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 161f38b58064404e78b421c8d9b6e493e6222000
Author: ran <gaoran...@126.com>
AuthorDate: Mon May 31 15:48:19 2021 +0800

    Fix kinesis sink backoff class not found (#10744)
    
    * copy the class `Backoff` to the project Kinesis connector
    
    (cherry picked from commit e19f6472183ac93dc058a96f7ac0537c22d90e18)
---
 .../java/org/apache/pulsar/io/kinesis/Backoff.java | 128 +++++++++++++++++++++
 .../org/apache/pulsar/io/kinesis/KinesisSink.java  |   1 -
 2 files changed, 128 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Backoff.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Backoff.java
new file mode 100644
index 0000000..babacd4
--- /dev/null
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Backoff.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kinesis;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Clock;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import lombok.Data;
+
+
+/**
+ * All variables are in TimeUnit millis by default.
+  */
+@Data
+public class Backoff {
+    public static final long DEFAULT_INTERVAL_IN_NANOSECONDS = 
TimeUnit.MILLISECONDS.toNanos(100);
+    public static final long MAX_BACKOFF_INTERVAL_NANOSECONDS = 
TimeUnit.SECONDS.toNanos(30);
+    private final long initial;
+    private final long max;
+    private final Clock clock;
+    private long next;
+    private long mandatoryStop;
+
+    private long firstBackoffTimeInMillis;
+    private boolean mandatoryStopMade = false;
+
+    private static final Random random = new Random();
+
+    @VisibleForTesting
+    Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, 
long mandatoryStop,
+            TimeUnit unitMandatoryStop, Clock clock) {
+        this.initial = unitInitial.toMillis(initial);
+        this.max = unitMax.toMillis(max);
+        this.next = this.initial;
+        this.mandatoryStop = unitMandatoryStop.toMillis(mandatoryStop);
+        this.clock = clock;
+    }
+
+    public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit 
unitMax, long mandatoryStop,
+                   TimeUnit unitMandatoryStop) {
+        this(initial, unitInitial, max, unitMax, mandatoryStop, 
unitMandatoryStop, Clock.systemDefaultZone());
+    }
+
+    public long next() {
+        long current = this.next;
+        if (current < max) {
+            this.next = Math.min(this.next * 2, this.max);
+        }
+
+        // Check for mandatory stop
+        if (!mandatoryStopMade) {
+            long now = clock.millis();
+            long timeElapsedSinceFirstBackoff = 0;
+            if (initial == current) {
+                firstBackoffTimeInMillis = now;
+            } else {
+                timeElapsedSinceFirstBackoff = now - firstBackoffTimeInMillis;
+            }
+
+            if (timeElapsedSinceFirstBackoff + current > mandatoryStop) {
+                current = Math.max(initial, mandatoryStop - 
timeElapsedSinceFirstBackoff);
+                mandatoryStopMade = true;
+            }
+        }
+
+        // Randomly decrease the timeout up to 10% to avoid simultaneous 
retries
+        // If current < 10 then current/10 < 1 and we get an exception from 
Random saying "Bound must be positive"
+        if (current > 10) {
+            current -= random.nextInt((int) current / 10);
+        }
+        return Math.max(initial, current);
+    }
+
+    public void reduceToHalf() {
+        if (next > initial) {
+            this.next = Math.max(this.next / 2, this.initial);
+        }
+    }
+
+    public void reset() {
+        this.next = this.initial;
+        this.mandatoryStopMade = false;
+    }
+
+    @VisibleForTesting
+    long getFirstBackoffTimeInMillis() {
+        return firstBackoffTimeInMillis;
+    }
+
+    public static boolean shouldBackoff(long initialTimestamp, TimeUnit 
unitInitial, int failedAttempts,
+                                                                       long 
defaultInterval, long maxBackoffInterval) {
+       long initialTimestampInNano = unitInitial.toNanos(initialTimestamp);
+        long currentTime = System.nanoTime();
+        long interval = defaultInterval;
+        for (int i = 1; i < failedAttempts; i++) {
+            interval = interval * 2;
+            if (interval > maxBackoffInterval) {
+                interval = maxBackoffInterval;
+                break;
+            }
+        }
+
+        // if the current time is less than the time at which next retry 
should occur, we should backoff
+        return currentTime < (initialTimestampInNano + interval);
+    }
+
+    public static boolean shouldBackoff(long initialTimestamp, TimeUnit 
unitInitial, int failedAttempts) {
+        return Backoff.shouldBackoff(initialTimestamp, unitInitial, 
failedAttempts,
+                                                                
DEFAULT_INTERVAL_IN_NANOSECONDS, MAX_BACKOFF_INTERVAL_NANOSECONDS);
+    }
+}
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index adc7c3c..ed7c70c 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -47,7 +47,6 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
-import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.aws.AbstractAwsConnector;
 import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;

Reply via email to