This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e7df5e28ec IGNITE-17686 Revisit and fix exception handling in futures 
chains in sql-engine module (#1118)
e7df5e28ec is described below

commit e7df5e28ec228e93af9eba831c1f495071a3a42b
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Mon Oct 3 16:42:26 2022 +0300

    IGNITE-17686 Revisit and fix exception handling in futures chains in 
sql-engine module (#1118)
---
 .../internal/sql/engine/SqlQueryProcessor.java     |  1 +
 .../sql/engine/exec/ddl/DdlCommandHandler.java     |  4 +++
 .../sql/engine/schema/SqlSchemaManagerImpl.java    | 30 ++++++++++------------
 3 files changed, 19 insertions(+), 16 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 7009e39cb2..d487f57ada 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -473,6 +473,7 @@ public class SqlQueryProcessor implements QueryProcessor {
                     .parameters(params)
                     .build();
 
+            // TODO https://issues.apache.org/jira/browse/IGNITE-17746 Fix 
query execution flow.
             CompletableFuture<AsyncSqlCursor<List<Object>>> stage = 
start.thenCompose(none -> prepareSvc.prepareAsync(sqlNode, ctx))
                     .thenApply(plan -> {
                         context.maybeUnwrap(QueryValidator.class)
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
index d0a2739d31..ca84b5d3b0 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
@@ -308,6 +308,8 @@ public class DdlCommandHandler {
         return tableManager.alterTableAsync(
                 fullName,
                 chng -> chng.changeColumns(cols -> {
+                    ret.set(true); // Reset state if closure have been 
restarted.
+
                     Map<String, String> colNamesToOrders = 
columnOrdersToNames(chng.columns());
 
                     List<ColumnDefinition> colsDef0;
@@ -388,6 +390,8 @@ public class DdlCommandHandler {
         return tableManager.alterTableAsync(
                         fullName,
                         chng -> chng.changeColumns(cols -> {
+                            ret.set(true); // Reset state if closure have been 
restarted.
+
                             PrimaryKeyView priKey = chng.primaryKey();
 
                             Map<String, String> colNamesToOrders = 
columnOrdersToNames(chng.columns());
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
index ff3822adf7..503721920b 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
@@ -227,7 +227,7 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
                                     Map<UUID, InternalIgniteTable> resTbls = 
new HashMap<>(tables);
 
                                     return igniteTableFuture
-                                            .thenApply(igniteTable -> 
inBusyLock(busyLock, () -> {
+                                            .thenApply(igniteTable -> {
                                                 InternalIgniteTable oldTable = 
resTbls.put(igniteTable.id(), igniteTable);
 
                                                 // looks like this is UPDATE 
operation
@@ -238,17 +238,13 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
                                                 }
 
                                                 return resTbls;
-                                            }));
+                                            });
                                 }))
-                        .thenCombine(
-                                igniteTableFuture,
-                                (v, igniteTable) -> inBusyLock(busyLock, () -> 
{
-                                            
schema.addTable(objectSimpleName(schemaName, table.name()), igniteTable);
-
-                                            return null;
-                                        }
-                                )).thenCompose(v -> inBusyLock(busyLock, () -> 
completedFuture(res)));
+                        .thenCombine(igniteTableFuture, (v, igniteTable) -> {
+                            schema.addTable(objectSimpleName(schemaName, 
table.name()), igniteTable);
 
+                            return res;
+                        });
             }));
 
             return calciteSchemaVv.get(causalityToken);
@@ -309,7 +305,7 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
                         resTbls.remove(table.id());
 
                         return completedFuture(resTbls);
-                    })).thenCompose(tables -> inBusyLock(busyLock, () -> 
completedFuture(res)));
+                    })).thenCompose(tables -> completedFuture(res));
                 }
 
                 return completedFuture(res);
@@ -436,15 +432,17 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
 
                                         return 
CompletableFuture.completedFuture(resIdxs);
                                     })
-                            ).thenRun(() -> inBusyLock(busyLock, () -> {
+                            ).thenCompose(ignore -> {
                                 String tblName = tableNameById(schema, 
index.tableId());
 
                                 table.addIndex(schemaIndex);
                                 schema.addTable(tblName, table);
                                 schema.addIndex(index.id(), schemaIndex);
-                            })).thenCompose(ignored -> inBusyLock(busyLock, () 
-> completedFuture(resTbls)));
+
+                                return completedFuture(resTbls);
+                            });
                         })
-                ).thenCompose(v -> inBusyLock(busyLock, () -> 
completedFuture(res)));
+                ).thenCompose(v -> completedFuture(res));
             }));
 
             return calciteSchemaVv.get(causalityToken);
@@ -518,9 +516,9 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
 
                                             return completedFuture(resIdxs);
                                         }
-                                )).thenCompose(v -> inBusyLock(busyLock, () -> 
completedFuture(resTbls)));
+                                )).thenCompose(v -> completedFuture(resTbls));
                             })
-                    ).thenCompose(v -> inBusyLock(busyLock, () -> 
completedFuture(res)));
+                    ).thenCompose(v -> completedFuture(res));
                 }
 
                 return completedFuture(res);

Reply via email to