This is an automated email from the ASF dual-hosted git repository. zstan pushed a commit to branch ignite-21776 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 6763140a3c5b884fcefb3164e4a3073e296c707e Author: zstan <zs...@apache.com> AuthorDate: Tue Mar 26 18:22:05 2024 +0300 IGNITE-21776 Sql. Concurrent create table command need to wait already processed result --- .../internal/catalog/CatalogManagerImpl.java | 42 ++++++++++++++++- .../CatalogVersionAwareValidationException.java | 41 +++++++++++++++++ .../internal/catalog/CatalogManagerSelfTest.java | 52 ++++++++++++++++++++++ 3 files changed, 133 insertions(+), 2 deletions(-) diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java index 9a709821ca..35622456d4 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java @@ -374,7 +374,42 @@ public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, Cata private CompletableFuture<Integer> saveUpdateAndWaitForActivation(UpdateProducer updateProducer) { return saveUpdate(updateProducer, 0) - .thenCompose(newVersion -> { + .handle((newVersion, err) -> { + if (err != null) { + Throwable error; + Catalog catalog; + + if (err instanceof CatalogVersionAwareValidationException) { + CatalogVersionAwareValidationException err0 = (CatalogVersionAwareValidationException) err; + catalog = catalogByVer.get(err0.version()); + error = err0.initial(); + } else { + catalog = catalogByVer.lastEntry().getValue(); + error = err; + } + + if (catalog.version() == 0) { + return failedFuture(error).thenApply(unused -> newVersion); + } + + HybridTimestamp tsSafeForRoReadingInPastOptimization = + clusterWideEnsuredActivationTsSafeForRoReads( + catalog, + partitionIdleSafeTimePropagationPeriodMsSupplier, + clockService.maxClockSkewMillis() + ); + + return clockService.waitFor(tsSafeForRoReadingInPastOptimization) + .handle((res, ex) -> { + if (ex != null) { + error.addSuppressed(ex); + } + return failedFuture(error); + }) + .thenCompose(unused -> failedFuture(error)) + .thenApply(unused -> newVersion); + } + Catalog catalog = catalogByVer.get(newVersion); HybridTimestamp tsSafeForRoReadingInPastOptimization = clusterWideEnsuredActivationTsSafeForRoReads( @@ -384,7 +419,8 @@ public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, Cata ); return clockService.waitFor(tsSafeForRoReadingInPastOptimization).thenApply(unused -> newVersion); - }); + }) + .thenCompose(Function.identity()); } /** @@ -410,6 +446,8 @@ public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, Cata List<UpdateEntry> updates; try { updates = updateProducer.get(catalog); + } catch (CatalogValidationException ex) { + return failedFuture(new CatalogVersionAwareValidationException(ex, catalog.version())); } catch (Exception ex) { return failedFuture(ex); } diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogVersionAwareValidationException.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogVersionAwareValidationException.java new file mode 100644 index 0000000000..4243c7e489 --- /dev/null +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogVersionAwareValidationException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.catalog; + +/** Catalog version aware exception wrapper. */ +class CatalogVersionAwareValidationException extends CatalogValidationException { + private static final long serialVersionUID = -1482326886088511707L; + + private final int version; + private final Throwable initial; + + CatalogVersionAwareValidationException(Throwable exception, int version) { + super(exception.getMessage()); + + this.version = version; + initial = exception; + } + + int version() { + return version; + } + + Throwable initial() { + return initial; + } +} diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java index b7d5b6d970..d469b4eb2b 100644 --- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java +++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java @@ -82,6 +82,7 @@ import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -1162,6 +1163,57 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { } } + @Test + public void createTableIfNotExistWaitsActivationEvenIfTableExists() throws Exception { + long delayDuration = TimeUnit.DAYS.toMillis(365); + + int partitionIdleSafeTimePropagationPeriod = 0; + + CatalogManagerImpl manager = new CatalogManagerImpl(updateLog, clockService, delayDuration, + partitionIdleSafeTimePropagationPeriod); + + manager.start(); + + try { + CatalogCommand createTableCommand = spy(simpleTable(TABLE_NAME)); + + CompletableFuture<Integer> createTableFuture1 = manager.execute(createTableCommand); + + assertFalse(createTableFuture1.isDone()); + + ArgumentCaptor<VersionedUpdate> appendCapture = ArgumentCaptor.forClass(VersionedUpdate.class); + + verify(updateLog).append(appendCapture.capture()); + + int catalogVerAfterTableCreate = appendCapture.getValue().version(); + + CompletableFuture<Integer> createTableFuture2 = manager.execute(createTableCommand); + + verify(createTableCommand, times(2)).get(any()); + + assertFalse(createTableFuture2.isDone()); + + verify(clockWaiter, timeout(10_000).times(2)).waitFor(any()); + + Catalog catalog0 = manager.catalog(manager.latestCatalogVersion()); + + assertNotNull(catalog0); + + HybridTimestamp activationSkew = CatalogUtils.clusterWideEnsuredActivationTsSafeForRoReads( + catalog0, + () -> partitionIdleSafeTimePropagationPeriod, clockService.maxClockSkewMillis()); + + clock.update(activationSkew); + + assertTrue(waitForCondition(createTableFuture1::isDone, 2_000)); + assertTrue(waitForCondition(createTableFuture2::isDone, 2_000)); + + assertSame(manager.schema(catalogVerAfterTableCreate), manager.activeSchema(clock.nowLong())); + } finally { + manager.stop(); + } + } + @Test public void catalogServiceManagesUpdateLogLifecycle() throws Exception { UpdateLog updateLogMock = mock(UpdateLog.class);