denis-chudov commented on a change in pull request #678:
URL: https://github.com/apache/ignite-3/pull/678#discussion_r811053655
##########
File path:
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
##########
@@ -156,6 +156,10 @@ public synchronized void onTableCreated(
) {
IgniteSchema schema = igniteSchemas.computeIfAbsent(schemaName,
IgniteSchema::new);
+ if (table == null || table.schemaView() == null) {
+ System.out.println("");
+ }
+
Review comment:
seems that this is not needed
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -1147,7 +1236,9 @@ public TableImpl table(UUID id) throws
NodeStoppingException {
return CompletableFuture.completedFuture(null);
}
- var tbl = tablesById.get(id);
+ Map<UUID, TableImpl> tablesById = tablesByIdVv.get().join();
Review comment:
I am not sure that such calls are NPE-safe.
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -540,16 +575,52 @@ private void createTableLocally(
schemaRegistry
);
- tables.put(name, table);
- tablesById.put(tblId, table);
+ tablesVv.update(causalityToken, previous -> {
+ var val = previous == null ? new HashMap() : new
HashMap<>(previous);
+
+ val.put(name, table);
+
+ return val;
+ }, th -> {
+ throw new
IgniteInternalException(IgniteStringFormatter.format("Cannot create a table
[name={}, id={}]", name, tblId),
+ th);
+ });
+
+ tablesByIdVv.update(causalityToken, previous -> {
+ var val = previous == null ? new HashMap() : new
HashMap<>(previous);
+
+ val.put(tblId, table);
+
+ return val;
+ }, th -> {
+ throw new
IgniteInternalException(IgniteStringFormatter.format("Cannot create a table
[name={}, id={}]", name, tblId),
+ th);
+ });
- fireEvent(TableEvent.CREATE, new TableEventParameters(table),
null);
+ completeApiCreateFuture(table);
Review comment:
the signature of `update` allows async completion, maybe we should
rewrite these updates and final call of ` completeApiCreateFuture` via
`thenCombine`? This is also about `dropTableLocally` .
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -540,16 +575,52 @@ private void createTableLocally(
schemaRegistry
);
- tables.put(name, table);
- tablesById.put(tblId, table);
+ tablesVv.update(causalityToken, previous -> {
+ var val = previous == null ? new HashMap() : new
HashMap<>(previous);
+
+ val.put(name, table);
+
+ return val;
+ }, th -> {
+ throw new
IgniteInternalException(IgniteStringFormatter.format("Cannot create a table
[name={}, id={}]", name, tblId),
+ th);
+ });
+
+ tablesByIdVv.update(causalityToken, previous -> {
+ var val = previous == null ? new HashMap() : new
HashMap<>(previous);
+
+ val.put(tblId, table);
+
+ return val;
+ }, th -> {
+ throw new
IgniteInternalException(IgniteStringFormatter.format("Cannot create a table
[name={}, id={}]", name, tblId),
+ th);
+ });
- fireEvent(TableEvent.CREATE, new TableEventParameters(table),
null);
+ completeApiCreateFuture(table);
+
+ fireEvent(TableEvent.CREATE, new
TableEventParameters(causalityToken, table), null);
} catch (Exception e) {
- fireEvent(TableEvent.CREATE, new TableEventParameters(tblId,
name), e);
+ fireEvent(TableEvent.CREATE, new
TableEventParameters(causalityToken, tblId, name), e);
}
}).join();
}
+ /**
+ * Completes appropriate future to return result from API {@link
TableManager#createTable(String, Consumer)}.
+ *
+ * @param table Table.
+ */
+ private void completeApiCreateFuture(TableImpl table) {
+ CompletableFuture<Table> tblFut = tableCreateFuts.get(table.tableId());
+
+ if (tblFut != null) {
+ tblFut.complete(table);
+
+ tableCreateFuts.values().removeIf(fut -> fut == tblFut);
+ }
+ }
Review comment:
what about using `tableCreateFuts.remove` to get the future?
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -235,41 +244,52 @@ private void
onTableCreateInternal(ConfigurationNotificationEvent<TableView> ctx
.listenElements(new
ConfigurationNamedListListener<>() {
@Override
public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
+ long causalityToken =
schemasCtx.storageRevision();
+
if (!busyLock.enterBusy()) {
fireEvent(
TableEvent.ALTER,
- new
TableEventParameters(tblId, tblName),
+ new
TableEventParameters(schemasCtx.storageRevision(), tblId, tblName),
Review comment:
pls replace `schemasCtx.storageRevision()` with `causalityToken` that is
declared above, so like in other places where `fireEvent` is called.
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -708,50 +797,50 @@ public Table createTable(String name,
Consumer<TableChange> tableInitChange) {
}
change.create(name, (ch) -> {
- tableInitChange.accept(ch);
-
- ((ExtendedTableChange) ch)
- // Affinity assignments calculation.
-
.changeAssignments(ByteUtils.toBytes(AffinityUtils.calculateAssignments(
- baselineMgr.nodes(),
- ch.partitions(),
- ch.replicas())))
- // Table schema preparation.
- .changeSchemas(schemasCh ->
schemasCh.create(
-
String.valueOf(INITIAL_SCHEMA_VERSION),
- schemaCh -> {
- SchemaDescriptor
schemaDesc;
-
- //TODO IGNITE-15747 Remove
try-catch and force configuration
- // validation here to
ensure a valid configuration passed to
- //
prepareSchemaDescriptor() method.
- try {
- schemaDesc =
SchemaUtils.prepareSchemaDescriptor(
-
((ExtendedTableView) ch).schemas().size(),
- ch);
- } catch
(IllegalArgumentException ex) {
- throw new
ConfigurationValidationException(ex.getMessage());
- }
-
-
schemaCh.changeSchema(SchemaSerializerImpl.INSTANCE.serialize(schemaDesc));
- }
- ));
- }
- );
- }).whenComplete((res, t) -> {
- if (t != null) {
- Throwable ex = getRootCause(t);
+ tableInitChange.accept(ch);
+
+ var extConfCh = ((ExtendedTableChange) ch);
+
+ tableCreateFuts.put(extConfCh.id(), tblFut);
+
+ // Affinity assignments calculation.
+
extConfCh.changeAssignments(ByteUtils.toBytes(AffinityUtils.calculateAssignments(
+ baselineMgr.nodes(),
+ ch.partitions(),
+ ch.replicas())))
+ // Table schema preparation.
+ .changeSchemas(schemasCh -> schemasCh.create(
+ String.valueOf(INITIAL_SCHEMA_VERSION),
+ schemaCh -> {
+ SchemaDescriptor schemaDesc;
+
+ //TODO IGNITE-15747 Remove
try-catch and force configuration
+ // validation here to ensure a
valid configuration passed to
+ // prepareSchemaDescriptor()
method.
+ try {
+ schemaDesc =
SchemaUtils.prepareSchemaDescriptor(
+ ((ExtendedTableView)
ch).schemas().size(),
+ ch);
+ } catch (IllegalArgumentException
ex) {
+ throw new
ConfigurationValidationException(ex.getMessage());
+ }
- if (ex instanceof TableAlreadyExistsException) {
- tblFut.completeExceptionally(ex);
- } else {
- LOG.error(IgniteStringFormatter.format("Table
wasn't created [name={}]", name), ex);
+
schemaCh.changeSchema(SchemaSerializerImpl.INSTANCE.serialize(schemaDesc));
+ }
+ ));
+ });
+ }).exceptionally(t -> {
+ Throwable ex = getRootCause(t);
- tblFut.completeExceptionally(ex);
- }
+ if (ex instanceof TableAlreadyExistsException) {
+ tblFut.completeExceptionally(ex);
} else {
- tblFut.complete(tables.get(name));
+ LOG.error(IgniteStringFormatter.format("Table wasn't
created [name={}]", name), ex);
+
+ tblFut.completeExceptionally(ex);
Review comment:
`tblFut` should also be removed from `tableCreateFuts`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]