ptupitsyn commented on code in PR #6779:
URL: https://github.com/apache/ignite-3/pull/6779#discussion_r2534550849
##########
modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java:
##########
@@ -131,6 +133,10 @@ public static void writeTx(@Nullable Transaction tx,
PayloadOutputChannel out, @
out.out().packLong(tx0.observableTimestamp());
out.out().packBoolean(tx.isReadOnly());
out.out().packLong(tx0.timeout());
+ if
(ctx.channel.protocolContext().isFeatureSupported(TX_CLIENT_GETALL_SUPPORTS_PRIORITY)
Review Comment:
Is there a chance that we need `lowPriority` flag for other operations in
the future? We could pass it to all operations and use a more general name for
the feature flag. Maybe even replace a boolean with a flags varint so it is
easier to extend later.
##########
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTableMapUtils.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.table;
+
+import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_CLIENT_GETALL_SUPPORTS_PRIORITY;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.client.table.ClientTable.Batch;
+import org.apache.ignite.internal.client.table.ClientTable.Reducer;
+import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.util.CompletableFutures;
+import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides batch map utility methods.
+ */
+class ClientTableMapUtils {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-27073
+ private static final long DEFAULT_IMPLICIT_GET_ALL_TIMEOUT_NANOS =
TimeUnit.MILLISECONDS.toNanos(5000);
+
+ static <R, E> void mapAndRetry(
+ MapFunction<E, R> mapFun,
+ @Nullable R initialValue, Reducer<R> reducer,
+ List<Transaction> txns,
+ Map<Integer, List<E>> mapped,
+ long[] startTs,
+ CompletableFuture<R> resFut,
+ IgniteLogger log
+ ) {
+ if (startTs[0] == 0) {
+ startTs[0] = System.nanoTime();
+ }
+
+ List<CompletableFuture<R>> res = new ArrayList<>();
+
+ for (Entry<Integer, List<E>> entry : mapped.entrySet()) {
+ res.add(mapFun.apply(entry.getValue(),
PartitionAwarenessProvider.of(entry.getKey()), mapped.size() > 1));
+ }
+
+ CompletableFutures.allOf(res).handle((ignored, err) -> {
+ List<CompletableFuture<Void>> waitCommitFuts = List.of();
+ if (!txns.isEmpty()) {
+ if (err != null) {
+ boolean needRetry = unlockOnRetry(txns, res, log);
+
+ long nowRelative = System.nanoTime();
+ if (needRetry && nowRelative - startTs[0] <
DEFAULT_IMPLICIT_GET_ALL_TIMEOUT_NANOS) {
+ startTs[0] = nowRelative;
+ txns.clear(); // The collection is re-filled on next
map attempt.
+
+ mapAndRetry(mapFun, initialValue, reducer, txns,
mapped, startTs, resFut, log);
+
+ return null;
+ }
+
+ resFut.completeExceptionally(err);
+
+ return null;
+ }
+
+ waitCommitFuts = unlockFragments(txns, log);
+ } else {
+ if (err != null) {
+ resFut.completeExceptionally(err);
+
+ return null;
+ }
+ }
+
+ R in = initialValue;
+
+ for (CompletableFuture<R> val : res) {
+ in = reducer.reduce(in, val.getNow(null));
+ }
+
+ if (waitCommitFuts.isEmpty()) {
+ resFut.complete(in);
+ } else {
+ R finalIn = in;
+ CompletableFutures.allOf(waitCommitFuts).whenComplete((r, e)
-> {
+ // Ignore errors.
+ resFut.complete(finalIn);
+ });
+ }
+
+ return null;
+ });
+ }
+
+ static <E> void mapAndRetry(
Review Comment:
This method looks identical to the above except for the reducer, can we
extract the common part?
##########
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTableMapUtils.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.table;
+
+import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_CLIENT_GETALL_SUPPORTS_PRIORITY;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.client.table.ClientTable.Batch;
+import org.apache.ignite.internal.client.table.ClientTable.Reducer;
+import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.util.CompletableFutures;
+import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides batch map utility methods.
+ */
+class ClientTableMapUtils {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-27073
+ private static final long DEFAULT_IMPLICIT_GET_ALL_TIMEOUT_NANOS =
TimeUnit.MILLISECONDS.toNanos(5000);
Review Comment:
5 seconds is way too small for GetAll and might break existing use cases. We
don't have a timeout for other operations, so this introduces inconsistent
behavior. If we want to limit retries, there is `RetryPolicy`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]