denis-chudov commented on code in PR #7680:
URL: https://github.com/apache/ignite-3/pull/7680#discussion_r2940113716


##########
modules/core/src/test/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategyTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit tests for {@link ExponentialBackoffTimeoutStrategy}.
+ *
+ * <p>Verifies the correctness of exponential timeout progression, the maximum 
timeout
+ * ceiling, and optional jitter behavior. Tests use predictable integer 
arithmetic to
+ * make expected values easy to verify by hand.
+ */
+public class ExponentialBackoffTimeoutStrategyTest {
+    /** Initial timeout passed to {@link TimeoutStrategy#next(int)} as the 
starting value. */
+    private static final int INITIAL_TIMEOUT = 20;
+
+    /** Maximum timeout the strategy is allowed to produce. */
+    private static final int MAX_TIMEOUT = 100;
+
+    /** Backoff coefficient used by the default strategy instance. Doubles 
each timeout step. */
+    private static final double BACKOFF_COEFFICIENT = 2.0;
+
+    /** Strategy instance under test, recreated before each test. */
+    private TimeoutStrategy timeoutStrategy;
+
+    /**
+     * Creates a fresh {@link ExponentialBackoffTimeoutStrategy} with {@link 
#MAX_TIMEOUT}
+     * and {@link #BACKOFF_COEFFICIENT}, without jitter, before each test.
+     */
+    @BeforeEach
+    void setUp() {
+        timeoutStrategy = new ExponentialBackoffTimeoutStrategy(MAX_TIMEOUT, 
BACKOFF_COEFFICIENT);
+    }
+
+    /**
+     * Verifies that a single call to {@link TimeoutStrategy#next(int)} returns
+     * {@code currentTimeout * backoffCoefficient}.
+     *
+     * <p>This is the core contract of the exponential strategy — each step 
multiplies
+     * the current timeout by the configured coefficient.
+     */
+    @Test
+    void testGettingNextTimeout() {
+        assertEquals(BACKOFF_COEFFICIENT * INITIAL_TIMEOUT, 
timeoutStrategy.next(INITIAL_TIMEOUT));
+    }
+
+    /**
+     * Verifies that the timeout progression reaches {@link #MAX_TIMEOUT} 
within the
+     * expected number of steps and does not exceed it on subsequent calls.
+     *
+     * <p>The upper bound on steps is computed from the coefficient and the 
ratio of
+     * {@code MAX_TIMEOUT} to {@code INITIAL_TIMEOUT}. If the strategy fails 
to reach
+     * the cap within this bound, the test fails with a descriptive message. 
Once the
+     * cap is reached, a further call to {@link TimeoutStrategy#next(int)} 
must return
+     * exactly {@link #MAX_TIMEOUT}.
+     */
+    @Test
+    void testMaxTimeoutNotExceeded() {
+        int maxSteps = 3;
+        int steps = 0;
+
+        int timeout = INITIAL_TIMEOUT;
+        do {
+            timeout  = timeoutStrategy.next(timeout);
+
+            assertTrue(++steps <= maxSteps,
+                    "Strategy did not reach MAX_TIMEOUT within expected steps, 
last timeout: " + timeout);
+        } while (timeout < MAX_TIMEOUT);
+
+        assertEquals(MAX_TIMEOUT, timeout);
+        assertEquals(MAX_TIMEOUT, timeoutStrategy.next(timeout));
+    }
+
+    /**
+     * Verifies that when jitter is enabled, the returned timeout falls within 
the
+     * expected randomized range {@code [initialTimeout, initialTimeout * 
coefficient^2)}.
+     *
+     * <p>A separate strategy instance with jitter enabled is created for this 
test.
+     * The lower bound confirms the jitter does not produce values below the 
initial
+     * timeout; the upper bound confirms it does not jump more than two 
coefficient
+     * steps in a single call.
+     */
+    @Test
+    void testJitterApplying() {

Review Comment:
   Please add the test checking that jitter never causes the exceeding of max 
timeout



##########
modules/core/src/main/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategy.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * A {@link TimeoutStrategy} that increases retry timeouts exponentially on 
each attempt.
+ *
+ * <p>Each call to {@link #next(int)} multiplies the current timeout by {@code 
backoffCoefficient},
+ * capping the result at {@link #maxTimeout()}. Optionally, random jitter can 
be applied to spread
+ * retry attempts across time and avoid thundering herd problems under high 
concurrency.
+ *
+ * <p>When jitter is enabled, the returned timeout is randomized within the 
range
+ * {@code [raw / 2, raw * 1.5]}, then capped at {@link #maxTimeout()}.
+ *
+ * <p>This class is stateless and thread-safe. A single instance can be shared 
across
+ * multiple retry contexts.
+ */
+public class ExponentialBackoffTimeoutStrategy implements TimeoutStrategy {
+    /** Default backoff coefficient applied on each retry step. Doubles the 
timeout per attempt. */
+    private static final double DEFAULT_BACKOFF_COEFFICIENT = 2.0;
+
+    /**
+     * Multiplier applied to the current timeout on each call to {@link 
#next(int)}.
+     * Must be greater than {@code 1.0} to produce a growing sequence.
+     */
+    private final double backoffCoefficient;
+
+    /**
+     * Whether to apply random jitter to the computed timeout.
+     * When {@code true}, the result is randomized within {@code [raw / 2, raw 
* 1.5]}.
+     */
+    private final boolean jitter;
+
+    /**
+     * Upper bound for the timeout produced by this strategy, in milliseconds.
+     * The result of {@link #next(int)} is always capped at this value.
+     */
+    private final int maxTimeout;
+
+    /**
+     * Creates a strategy with default max timeout and backoff coefficient, 
and no jitter.
+     *
+     * @see TimeoutStrategy#DEFAULT_TIMEOUT_MS_MAX
+     */
+    public ExponentialBackoffTimeoutStrategy() {
+        this(DEFAULT_TIMEOUT_MS_MAX, DEFAULT_BACKOFF_COEFFICIENT);
+    }
+
+    /**
+     * Creates a strategy with the given max timeout and backoff coefficient, 
and no jitter.
+     *
+     * @param maxTimeout         maximum timeout this strategy may produce, in 
milliseconds.
+     * @param backoffCoefficient multiplier applied to the current timeout on 
each step.
+     *                           Must be greater than {@code 1.0}.
+     */
+    public ExponentialBackoffTimeoutStrategy(
+            int maxTimeout,
+            double backoffCoefficient
+    ) {
+        this(maxTimeout, backoffCoefficient, false);
+    }
+
+    /**
+     * Creates a strategy with the given max timeout, backoff coefficient, and 
jitter setting.
+     *
+     * @param maxTimeout         maximum timeout this strategy may produce, in 
milliseconds.
+     * @param backoffCoefficient multiplier applied to the current timeout on 
each step.
+     *                           Must be greater than {@code 1.0}.
+     * @param jitter             if {@code true}, random jitter is applied to 
each computed timeout.
+     */
+    public ExponentialBackoffTimeoutStrategy(
+            int maxTimeout,
+            double backoffCoefficient,
+            boolean jitter
+    ) {
+        this.maxTimeout = maxTimeout;
+        this.backoffCoefficient = backoffCoefficient;
+        this.jitter = jitter;
+    }
+
+    /**
+     * Computes the next retry timeout by multiplying {@code currentTimeout} by
+     * {@link #backoffCoefficient}, then capping at {@link #maxTimeout}.
+     * If jitter is enabled, the result is further randomized via {@link 
#applyJitter(int)}.
+     *
+     * @param currentTimeout current retry timeout in milliseconds.
+     * @return next retry timeout in milliseconds, capped at {@link 
#maxTimeout}.
+     */
+    @Override
+    public int next(int currentTimeout) {
+        int raw = (int) Math.min((currentTimeout * backoffCoefficient), 
maxTimeout);
+
+        return jitter ? applyJitter(raw) : raw;

Review Comment:
   here you have two `Math.min` - here and in `applyJitter`. I would suggest to 
apply max timeout after the jitter



##########
modules/core/src/main/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContext.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Optional.of;
+import static java.util.Optional.ofNullable;
+import static org.apache.ignite.internal.util.retry.TimeoutState.attempt;
+import static org.apache.ignite.internal.util.retry.TimeoutState.timeout;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * A retry context that tracks timeout state independently per key.
+ *
+ * <p>Each key maps to its own {@link TimeoutState}, allowing separate backoff 
progression
+ * for different retry targets — for example, different replication group IDs 
or transaction IDs.
+ * State updates are performed atomically per key using {@link 
ConcurrentHashMap#compute}.
+ *
+ * <p>To prevent unbounded memory growth, the registry is capped at {@link 
#REGISTRY_SIZE_LIMIT}
+ * entries. Once the limit is reached, untracked keys receive a fixed {@link 
#fallbackTimeoutState}
+ * that always returns {@link TimeoutStrategy#maxTimeout()}. The limit is a 
soft cap and may be
+ * slightly exceeded under concurrent insertions.
+ *
+ * <p>This class is thread-safe.
+ */
+public class KeyBasedRetryContext {
+    /**
+     * Maximum number of keys tracked in {@link #registry}.
+     * Can be slightly exceeded under concurrent insertions. See class-level 
Javadoc.
+     */
+    private static final int REGISTRY_SIZE_LIMIT = 1_000;
+
+    /**
+     * Timeout used when creating a new {@link TimeoutState} for a key that 
has no prior state.
+     * Also used as the reset value when a key's state is removed.
+     */
+    private final int initialTimeout;
+
+    /** Strategy used to compute the next timeout from the current one on each 
advancement. */
+    private final TimeoutStrategy timeoutStrategy;
+
+    /**
+     * Sentinel state returned for keys that cannot be tracked because the 
registry is full.
+     * Initialized with {@link TimeoutStrategy#maxTimeout()} and attempt 
{@code -1} to distinguish
+     * it from legitimately tracked states.
+     */
+    private final TimeoutState fallbackTimeoutState;
+
+    /** Per-key timeout state registry. Keys are typically transaction IDs or 
replication group IDs. */
+    private final ConcurrentHashMap<String, TimeoutState> registry = new 
ConcurrentHashMap<>();
+
+    /**
+     * Creates a new context with the given initial timeout and strategy.
+     *
+     * @param initialTimeout timeout used for the first retry attempt of any 
new key, in milliseconds.
+     * @param timeoutStrategy strategy used to compute subsequent timeout 
values.
+     */
+    public KeyBasedRetryContext(int initialTimeout, TimeoutStrategy 
timeoutStrategy) {
+        this.initialTimeout = initialTimeout;
+        this.timeoutStrategy = timeoutStrategy;
+
+        this.fallbackTimeoutState = new 
TimeoutState(timeoutStrategy.maxTimeout(), -1);
+    }
+
+    /**
+     * Returns the current {@link TimeoutState} for the given key, if tracked.
+     *
+     * <p>Returns an empty {@link Optional} if the key has no recorded state 
yet.
+     * If the registry is full and the key is not already tracked, returns
+     * {@link Optional} containing the {@link #fallbackTimeoutState}.
+     *
+     * <p>This method does not insert the key into the registry.
+     *
+     * @param key the key to look up, typically a transaction ID or 
replication group ID.
+     * @return current state for the key, fallback state if registry is full, 
or empty if not tracked.
+     */
+    public Optional<TimeoutState> getState(String key) {
+        if (!registry.containsKey(key) && registry.size() >= 
REGISTRY_SIZE_LIMIT) {
+            return of(fallbackTimeoutState);
+        }
+
+        return ofNullable(registry.get(key));
+    }
+
+    /**
+     * Atomically advances the retry state for the given key and returns the 
updated state.
+     *
+     * <p>If the key has no prior state, a new {@link TimeoutState} is created 
with
+     * {@link #initialTimeout} and attempt count {@code 1}. Otherwise, the 
timeout is
+     * advanced using {@link TimeoutStrategy#next(int)} and the attempt count 
is incremented.
+     *
+     * <p>The update is performed inside {@link ConcurrentHashMap#compute}, 
which holds
+     * an exclusive per-key lock for the duration of the lambda. The CAS on the
+     * {@link TimeoutState}'s internal {@link AtomicLong} is therefore always 
expected

Review Comment:
   Please add qualifier



##########
modules/core/src/main/java/org/apache/ignite/internal/util/retry/SharedRetryContext.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import static java.util.Optional.ofNullable;
+import static org.apache.ignite.internal.util.retry.TimeoutState.attempt;
+import static org.apache.ignite.internal.util.retry.TimeoutState.timeout;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A retry context that tracks a single shared timeout state across all 
callers.
+ *
+ * <p>Unlike {@link KeyBasedRetryContext}, this context does not distinguish 
between
+ * retry targets — all callers advance and observe the same {@link 
TimeoutState}.
+ * This is appropriate when a single backoff sequence should govern all retries
+ * regardless of which operation is being retried.
+ *
+ * <p>The state is initialized lazily on the first call to {@link 
#updateAndGetState()},
+ * and can be reset to {@code null} via {@link #resetState()}, allowing the 
progression
+ * to restart from scratch. {@link #getState()} returns an empty {@link 
Optional} before
+ * the first call and after a reset.
+ *
+ * <p>Concurrent calls to {@link #updateAndGetState()} and {@link 
#resetState()} are safe.
+ * The {@link AtomicReference} handles structural transitions ({@code null ↔ 
initialized}),
+ * while the {@link TimeoutState}'s internal {@link AtomicLong} CAS handles 
concurrent

Review Comment:
   Please add qualifier.



##########
modules/core/src/main/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContext.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Optional.of;
+import static java.util.Optional.ofNullable;
+import static org.apache.ignite.internal.util.retry.TimeoutState.attempt;
+import static org.apache.ignite.internal.util.retry.TimeoutState.timeout;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * A retry context that tracks timeout state independently per key.
+ *
+ * <p>Each key maps to its own {@link TimeoutState}, allowing separate backoff 
progression
+ * for different retry targets — for example, different replication group IDs 
or transaction IDs.
+ * State updates are performed atomically per key using {@link 
ConcurrentHashMap#compute}.
+ *
+ * <p>To prevent unbounded memory growth, the registry is capped at {@link 
#REGISTRY_SIZE_LIMIT}
+ * entries. Once the limit is reached, untracked keys receive a fixed {@link 
#fallbackTimeoutState}
+ * that always returns {@link TimeoutStrategy#maxTimeout()}. The limit is a 
soft cap and may be
+ * slightly exceeded under concurrent insertions.
+ *
+ * <p>This class is thread-safe.
+ */
+public class KeyBasedRetryContext {
+    /**
+     * Maximum number of keys tracked in {@link #registry}.
+     * Can be slightly exceeded under concurrent insertions. See class-level 
Javadoc.
+     */
+    private static final int REGISTRY_SIZE_LIMIT = 1_000;
+
+    /**
+     * Timeout used when creating a new {@link TimeoutState} for a key that 
has no prior state.
+     * Also used as the reset value when a key's state is removed.
+     */
+    private final int initialTimeout;
+
+    /** Strategy used to compute the next timeout from the current one on each 
advancement. */
+    private final TimeoutStrategy timeoutStrategy;
+
+    /**
+     * Sentinel state returned for keys that cannot be tracked because the 
registry is full.
+     * Initialized with {@link TimeoutStrategy#maxTimeout()} and attempt 
{@code -1} to distinguish
+     * it from legitimately tracked states.
+     */
+    private final TimeoutState fallbackTimeoutState;
+
+    /** Per-key timeout state registry. Keys are typically transaction IDs or 
replication group IDs. */
+    private final ConcurrentHashMap<String, TimeoutState> registry = new 
ConcurrentHashMap<>();
+
+    /**
+     * Creates a new context with the given initial timeout and strategy.
+     *
+     * @param initialTimeout timeout used for the first retry attempt of any 
new key, in milliseconds.
+     * @param timeoutStrategy strategy used to compute subsequent timeout 
values.
+     */
+    public KeyBasedRetryContext(int initialTimeout, TimeoutStrategy 
timeoutStrategy) {
+        this.initialTimeout = initialTimeout;
+        this.timeoutStrategy = timeoutStrategy;
+
+        this.fallbackTimeoutState = new 
TimeoutState(timeoutStrategy.maxTimeout(), -1);
+    }
+
+    /**
+     * Returns the current {@link TimeoutState} for the given key, if tracked.
+     *
+     * <p>Returns an empty {@link Optional} if the key has no recorded state 
yet.
+     * If the registry is full and the key is not already tracked, returns
+     * {@link Optional} containing the {@link #fallbackTimeoutState}.
+     *
+     * <p>This method does not insert the key into the registry.
+     *
+     * @param key the key to look up, typically a transaction ID or 
replication group ID.
+     * @return current state for the key, fallback state if registry is full, 
or empty if not tracked.
+     */
+    public Optional<TimeoutState> getState(String key) {
+        if (!registry.containsKey(key) && registry.size() >= 
REGISTRY_SIZE_LIMIT) {
+            return of(fallbackTimeoutState);
+        }
+
+        return ofNullable(registry.get(key));
+    }
+
+    /**
+     * Atomically advances the retry state for the given key and returns the 
updated state.
+     *
+     * <p>If the key has no prior state, a new {@link TimeoutState} is created 
with
+     * {@link #initialTimeout} and attempt count {@code 1}. Otherwise, the 
timeout is
+     * advanced using {@link TimeoutStrategy#next(int)} and the attempt count 
is incremented.
+     *
+     * <p>The update is performed inside {@link ConcurrentHashMap#compute}, 
which holds
+     * an exclusive per-key lock for the duration of the lambda. The CAS on the
+     * {@link TimeoutState}'s internal {@link AtomicLong} is therefore always 
expected
+     * to succeed on the first attempt within the lambda.
+     *
+     * <p>If the registry is full and the key is not already tracked, returns
+     * {@link #fallbackTimeoutState} without modifying the registry.
+     *
+     * @param key the key to advance state for, typically a transaction ID or 
replication group ID.
+     * @return updated {@link TimeoutState} for the key, or {@link 
#fallbackTimeoutState}
+     *         if the registry is full.
+     */
+    public TimeoutState updateAndGetState(String key) {
+        if (!registry.containsKey(key) && registry.size() >= 
REGISTRY_SIZE_LIMIT) {
+            return fallbackTimeoutState;
+        }
+
+        return registry.compute(key, (k, state) -> {
+            if (state == null) {
+                return new TimeoutState(initialTimeout, 1);
+            }
+
+            long currentState = state.getRawState();
+            state.update(currentState, 
timeoutStrategy.next(timeout(currentState)), attempt(currentState) + 1);

Review Comment:
   So, we get the raw state just to unpack its components and use them as 
arguments for another call to the same object. This looks like unnecessary 
leakage of internal state.
   I would suggest `TimeoutState#update(timeoutStrategy)` method that would 
keep the internal state hidden



##########
modules/core/src/main/java/org/apache/ignite/internal/util/retry/SharedRetryContext.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import static java.util.Optional.ofNullable;
+import static org.apache.ignite.internal.util.retry.TimeoutState.attempt;
+import static org.apache.ignite.internal.util.retry.TimeoutState.timeout;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A retry context that tracks a single shared timeout state across all 
callers.
+ *
+ * <p>Unlike {@link KeyBasedRetryContext}, this context does not distinguish 
between
+ * retry targets — all callers advance and observe the same {@link 
TimeoutState}.
+ * This is appropriate when a single backoff sequence should govern all retries
+ * regardless of which operation is being retried.
+ *
+ * <p>The state is initialized lazily on the first call to {@link 
#updateAndGetState()},
+ * and can be reset to {@code null} via {@link #resetState()}, allowing the 
progression
+ * to restart from scratch. {@link #getState()} returns an empty {@link 
Optional} before
+ * the first call and after a reset.
+ *
+ * <p>Concurrent calls to {@link #updateAndGetState()} and {@link 
#resetState()} are safe.
+ * The {@link AtomicReference} handles structural transitions ({@code null ↔ 
initialized}),
+ * while the {@link TimeoutState}'s internal {@link AtomicLong} CAS handles 
concurrent
+ * value updates without allocating new objects on the hot path.
+ *
+ * <p>This class is thread-safe.
+ */
+public class SharedRetryContext {

Review Comment:
   Shouldn't it share the same interface with KeyBasedRetryContext?



##########
modules/core/src/main/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContext.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Optional.of;
+import static java.util.Optional.ofNullable;
+import static org.apache.ignite.internal.util.retry.TimeoutState.attempt;
+import static org.apache.ignite.internal.util.retry.TimeoutState.timeout;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * A retry context that tracks timeout state independently per key.
+ *
+ * <p>Each key maps to its own {@link TimeoutState}, allowing separate backoff 
progression
+ * for different retry targets — for example, different replication group IDs 
or transaction IDs.
+ * State updates are performed atomically per key using {@link 
ConcurrentHashMap#compute}.
+ *
+ * <p>To prevent unbounded memory growth, the registry is capped at {@link 
#REGISTRY_SIZE_LIMIT}
+ * entries. Once the limit is reached, untracked keys receive a fixed {@link 
#fallbackTimeoutState}
+ * that always returns {@link TimeoutStrategy#maxTimeout()}. The limit is a 
soft cap and may be
+ * slightly exceeded under concurrent insertions.
+ *
+ * <p>This class is thread-safe.
+ */
+public class KeyBasedRetryContext {
+    /**
+     * Maximum number of keys tracked in {@link #registry}.
+     * Can be slightly exceeded under concurrent insertions. See class-level 
Javadoc.
+     */
+    private static final int REGISTRY_SIZE_LIMIT = 1_000;
+
+    /**
+     * Timeout used when creating a new {@link TimeoutState} for a key that 
has no prior state.
+     * Also used as the reset value when a key's state is removed.
+     */
+    private final int initialTimeout;
+
+    /** Strategy used to compute the next timeout from the current one on each 
advancement. */
+    private final TimeoutStrategy timeoutStrategy;
+
+    /**
+     * Sentinel state returned for keys that cannot be tracked because the 
registry is full.
+     * Initialized with {@link TimeoutStrategy#maxTimeout()} and attempt 
{@code -1} to distinguish
+     * it from legitimately tracked states.
+     */
+    private final TimeoutState fallbackTimeoutState;
+
+    /** Per-key timeout state registry. Keys are typically transaction IDs or 
replication group IDs. */
+    private final ConcurrentHashMap<String, TimeoutState> registry = new 
ConcurrentHashMap<>();
+
+    /**
+     * Creates a new context with the given initial timeout and strategy.
+     *
+     * @param initialTimeout timeout used for the first retry attempt of any 
new key, in milliseconds.
+     * @param timeoutStrategy strategy used to compute subsequent timeout 
values.
+     */
+    public KeyBasedRetryContext(int initialTimeout, TimeoutStrategy 
timeoutStrategy) {
+        this.initialTimeout = initialTimeout;
+        this.timeoutStrategy = timeoutStrategy;
+
+        this.fallbackTimeoutState = new 
TimeoutState(timeoutStrategy.maxTimeout(), -1);
+    }
+
+    /**
+     * Returns the current {@link TimeoutState} for the given key, if tracked.
+     *
+     * <p>Returns an empty {@link Optional} if the key has no recorded state 
yet.
+     * If the registry is full and the key is not already tracked, returns
+     * {@link Optional} containing the {@link #fallbackTimeoutState}.
+     *
+     * <p>This method does not insert the key into the registry.
+     *
+     * @param key the key to look up, typically a transaction ID or 
replication group ID.
+     * @return current state for the key, fallback state if registry is full, 
or empty if not tracked.
+     */
+    public Optional<TimeoutState> getState(String key) {
+        if (!registry.containsKey(key) && registry.size() >= 
REGISTRY_SIZE_LIMIT) {
+            return of(fallbackTimeoutState);
+        }
+
+        return ofNullable(registry.get(key));
+    }
+
+    /**
+     * Atomically advances the retry state for the given key and returns the 
updated state.
+     *
+     * <p>If the key has no prior state, a new {@link TimeoutState} is created 
with
+     * {@link #initialTimeout} and attempt count {@code 1}. Otherwise, the 
timeout is
+     * advanced using {@link TimeoutStrategy#next(int)} and the attempt count 
is incremented.
+     *
+     * <p>The update is performed inside {@link ConcurrentHashMap#compute}, 
which holds
+     * an exclusive per-key lock for the duration of the lambda. The CAS on the
+     * {@link TimeoutState}'s internal {@link AtomicLong} is therefore always 
expected
+     * to succeed on the first attempt within the lambda.
+     *
+     * <p>If the registry is full and the key is not already tracked, returns
+     * {@link #fallbackTimeoutState} without modifying the registry.
+     *
+     * @param key the key to advance state for, typically a transaction ID or 
replication group ID.
+     * @return updated {@link TimeoutState} for the key, or {@link 
#fallbackTimeoutState}
+     *         if the registry is full.
+     */
+    public TimeoutState updateAndGetState(String key) {
+        if (!registry.containsKey(key) && registry.size() >= 
REGISTRY_SIZE_LIMIT) {
+            return fallbackTimeoutState;
+        }
+
+        return registry.compute(key, (k, state) -> {
+            if (state == null) {
+                return new TimeoutState(initialTimeout, 1);
+            }
+
+            long currentState = state.getRawState();
+            state.update(currentState, 
timeoutStrategy.next(timeout(currentState)), attempt(currentState) + 1);
+
+            return state;
+        });
+    }
+
+    /**
+     * Removes the retry state for the given key, resetting it as if no 
retries had occurred.
+     *
+     * <p>The next call to {@link #updateAndGetState(String)} for this key 
after a reset
+     * will create fresh state starting from {@link #initialTimeout}.
+     *
+     * @param key the key whose state should be removed.
+     */
+    public void resetState(String key) {

Review Comment:
   This is called on successfull attempt of retry. But what if it will never 
happen? 1000 transactions/groups will hang forever in this map, causing 
everything else to fallback on maximum timeout?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -401,12 +412,15 @@ public TxManagerImpl(
 
         transactionExpirationRegistry = new 
TransactionExpirationRegistry(txStateVolatileStorage);
 
+        retryContext = new KeyBasedRetryContext(20, timeoutStrategy);

Review Comment:
   You replaced `RETRY_INITIAL_TIMEOUT_MS` constant, but now the initial 
timeout became just a "magic number"



##########
modules/core/src/main/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategy.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * A {@link TimeoutStrategy} that increases retry timeouts exponentially on 
each attempt.
+ *
+ * <p>Each call to {@link #next(int)} multiplies the current timeout by {@code 
backoffCoefficient},
+ * capping the result at {@link #maxTimeout()}. Optionally, random jitter can 
be applied to spread
+ * retry attempts across time and avoid thundering herd problems under high 
concurrency.
+ *
+ * <p>When jitter is enabled, the returned timeout is randomized within the 
range
+ * {@code [raw / 2, raw * 1.5]}, then capped at {@link #maxTimeout()}.
+ *
+ * <p>This class is stateless and thread-safe. A single instance can be shared 
across
+ * multiple retry contexts.
+ */
+public class ExponentialBackoffTimeoutStrategy implements TimeoutStrategy {

Review Comment:
   I think, we should add TODO here to replace timeout strategy in 
`ignite-raft` module with this one.



##########
modules/core/src/test/java/org/apache/ignite/internal/util/retry/SharedRetryContextTest.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.retry;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.IntStream;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/**
+ * Unit tests for {@link SharedRetryContext}.
+ *
+ * <p>Verifies lazy initialization, sequential timeout progression, reset 
behavior,
+ * and thread safety of concurrent updates. A deterministic {@link 
TestProgressiveTimeoutStrategy}
+ * with a fixed multiplier is used to make expected timeout values easy to 
compute by hand.
+ */
+public class SharedRetryContextTest {

Review Comment:
   This is the single place where `SharedRetryContext` is created, is it really 
needed in this PR?



##########
modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxCleanupFailureTest.java:
##########
@@ -52,6 +98,50 @@ public void setup() {
         sql(tableSql);
     }
 
+    /**
+     * Verifies that no retry occurs when the write intent cleanup succeeds on 
the first attempt.
+     *
+     * <p>Installs a message interceptor that counts {@link 
WriteIntentSwitchReplicaRequest}
+     * messages without dropping any of them. After all write intents are 
resolved, asserts
+     * that exactly one cleanup message was sent across all nodes.
+     */
+    @Test
+    public void testNoRetryOnSuccessfulCleanup() {
+        IgniteImpl node = anyNode();
+        Transaction tx = node.transactions().begin();
+        node.sql().execute(tx, "insert into " + TABLE_NAME + " (key, val) 
values (1, 'val-1')");
+
+        AtomicInteger cleanupAttempts = new AtomicInteger();
+
+        for (IgniteImpl n : runningNodesIter()) {
+            n.dropMessages((dest, msg) -> {
+                if (msg instanceof WriteIntentSwitchReplicaRequest) {
+                    cleanupAttempts.incrementAndGet();
+                }
+                return false;
+            });
+        }
+
+        tx.commitAsync();
+
+        await().timeout(5, TimeUnit.SECONDS)

Review Comment:
   `TimeUnit` not needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to