This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch ignite-21776
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 6763140a3c5b884fcefb3164e4a3073e296c707e
Author: zstan <zs...@apache.com>
AuthorDate: Tue Mar 26 18:22:05 2024 +0300

    IGNITE-21776 Sql. Concurrent create table command need to wait already 
processed result
---
 .../internal/catalog/CatalogManagerImpl.java       | 42 ++++++++++++++++-
 .../CatalogVersionAwareValidationException.java    | 41 +++++++++++++++++
 .../internal/catalog/CatalogManagerSelfTest.java   | 52 ++++++++++++++++++++++
 3 files changed, 133 insertions(+), 2 deletions(-)

diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index 9a709821ca..35622456d4 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -374,7 +374,42 @@ public class CatalogManagerImpl extends 
AbstractEventProducer<CatalogEvent, Cata
 
     private CompletableFuture<Integer> 
saveUpdateAndWaitForActivation(UpdateProducer updateProducer) {
         return saveUpdate(updateProducer, 0)
-                .thenCompose(newVersion -> {
+                .handle((newVersion, err) -> {
+                    if (err != null) {
+                        Throwable error;
+                        Catalog catalog;
+
+                        if (err instanceof 
CatalogVersionAwareValidationException) {
+                            CatalogVersionAwareValidationException err0 = 
(CatalogVersionAwareValidationException) err;
+                            catalog = catalogByVer.get(err0.version());
+                            error = err0.initial();
+                        } else {
+                            catalog = catalogByVer.lastEntry().getValue();
+                            error = err;
+                        }
+
+                        if (catalog.version() == 0) {
+                            return failedFuture(error).thenApply(unused -> 
newVersion);
+                        }
+
+                        HybridTimestamp tsSafeForRoReadingInPastOptimization =
+                                clusterWideEnsuredActivationTsSafeForRoReads(
+                                        catalog,
+                                        
partitionIdleSafeTimePropagationPeriodMsSupplier,
+                                        clockService.maxClockSkewMillis()
+                        );
+
+                        return 
clockService.waitFor(tsSafeForRoReadingInPastOptimization)
+                                .handle((res, ex) -> {
+                                    if (ex != null) {
+                                        error.addSuppressed(ex);
+                                    }
+                                    return failedFuture(error);
+                                })
+                                .thenCompose(unused -> failedFuture(error))
+                                .thenApply(unused -> newVersion);
+                    }
+
                     Catalog catalog = catalogByVer.get(newVersion);
 
                     HybridTimestamp tsSafeForRoReadingInPastOptimization = 
clusterWideEnsuredActivationTsSafeForRoReads(
@@ -384,7 +419,8 @@ public class CatalogManagerImpl extends 
AbstractEventProducer<CatalogEvent, Cata
                     );
 
                     return 
clockService.waitFor(tsSafeForRoReadingInPastOptimization).thenApply(unused -> 
newVersion);
-                });
+                })
+                .thenCompose(Function.identity());
     }
 
     /**
@@ -410,6 +446,8 @@ public class CatalogManagerImpl extends 
AbstractEventProducer<CatalogEvent, Cata
             List<UpdateEntry> updates;
             try {
                 updates = updateProducer.get(catalog);
+            } catch (CatalogValidationException ex) {
+                return failedFuture(new 
CatalogVersionAwareValidationException(ex, catalog.version()));
             } catch (Exception ex) {
                 return failedFuture(ex);
             }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogVersionAwareValidationException.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogVersionAwareValidationException.java
new file mode 100644
index 0000000000..4243c7e489
--- /dev/null
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogVersionAwareValidationException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+/** Catalog version aware exception wrapper. */
+class CatalogVersionAwareValidationException extends 
CatalogValidationException {
+    private static final long serialVersionUID = -1482326886088511707L;
+
+    private final int version;
+    private final Throwable initial;
+
+    CatalogVersionAwareValidationException(Throwable exception, int version) {
+        super(exception.getMessage());
+
+        this.version = version;
+        initial = exception;
+    }
+
+    int version() {
+        return version;
+    }
+
+    Throwable initial() {
+        return initial;
+    }
+}
diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index b7d5b6d970..d469b4eb2b 100644
--- 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -82,6 +82,7 @@ import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -1162,6 +1163,57 @@ public class CatalogManagerSelfTest extends 
BaseCatalogManagerTest {
         }
     }
 
+    @Test
+    public void createTableIfNotExistWaitsActivationEvenIfTableExists() throws 
Exception {
+        long delayDuration = TimeUnit.DAYS.toMillis(365);
+
+        int partitionIdleSafeTimePropagationPeriod = 0;
+
+        CatalogManagerImpl manager = new CatalogManagerImpl(updateLog, 
clockService, delayDuration,
+                partitionIdleSafeTimePropagationPeriod);
+
+        manager.start();
+
+        try {
+            CatalogCommand createTableCommand = spy(simpleTable(TABLE_NAME));
+
+            CompletableFuture<Integer> createTableFuture1 = 
manager.execute(createTableCommand);
+
+            assertFalse(createTableFuture1.isDone());
+
+            ArgumentCaptor<VersionedUpdate> appendCapture = 
ArgumentCaptor.forClass(VersionedUpdate.class);
+
+            verify(updateLog).append(appendCapture.capture());
+
+            int catalogVerAfterTableCreate = 
appendCapture.getValue().version();
+
+            CompletableFuture<Integer> createTableFuture2 = 
manager.execute(createTableCommand);
+
+            verify(createTableCommand, times(2)).get(any());
+
+            assertFalse(createTableFuture2.isDone());
+
+            verify(clockWaiter, timeout(10_000).times(2)).waitFor(any());
+
+            Catalog catalog0 = manager.catalog(manager.latestCatalogVersion());
+
+            assertNotNull(catalog0);
+
+            HybridTimestamp activationSkew = 
CatalogUtils.clusterWideEnsuredActivationTsSafeForRoReads(
+                    catalog0,
+                    () -> partitionIdleSafeTimePropagationPeriod, 
clockService.maxClockSkewMillis());
+
+            clock.update(activationSkew);
+
+            assertTrue(waitForCondition(createTableFuture1::isDone, 2_000));
+            assertTrue(waitForCondition(createTableFuture2::isDone, 2_000));
+
+            assertSame(manager.schema(catalogVerAfterTableCreate), 
manager.activeSchema(clock.nowLong()));
+        } finally {
+            manager.stop();
+        }
+    }
+
     @Test
     public void catalogServiceManagesUpdateLogLifecycle() throws Exception {
         UpdateLog updateLogMock = mock(UpdateLog.class);

Reply via email to