tkalkirill commented on code in PR #2500:
URL: https://github.com/apache/ignite-3/pull/2500#discussion_r1319416312
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2013,94 +1589,67 @@ public CompletableFuture<TableImpl>
tableImplAsync(String name) {
* @return Future representing pending completion of the {@code
TableManager#tableAsyncInternal} operation.
*/
public CompletableFuture<TableImpl> tableAsyncInternal(String name) {
- if (!busyLock.enterBusy()) {
- throw new IgniteException(new NodeStoppingException());
- }
+ return inBusyLockAsync(busyLock, () -> {
+ HybridTimestamp now = clock.now();
- try {
- return supplyAsync(() -> inBusyLock(busyLock, () ->
directTableId(name)), ioExecutor)
- .thenCompose(tableId -> inBusyLock(busyLock, () -> {
- if (tableId == null) {
+ return anyOf(schemaSyncService.waitForMetadataCompleteness(now),
stopManagerFuture)
+ .thenComposeAsync(unused -> inBusyLockAsync(busyLock, ()
-> {
+ CatalogTableDescriptor tableDescriptor =
catalogService.table(name, now.longValue());
+
+ // Check if the table has been deleted.
+ if (tableDescriptor == null) {
return completedFuture(null);
}
- return tableAsyncInternal(tableId, false);
+ return tableAsyncInternalBusy(tableDescriptor.id());
}));
- } finally {
- busyLock.leaveBusy();
- }
+ });
}
- /**
- * Internal method for getting table by id.
- *
- * @param id Table id.
- * @param checkConfiguration {@code True} when the method checks a
configuration before trying to get a table, {@code false}
- * otherwise.
- * @return Future representing pending completion of the operation.
- */
- public CompletableFuture<TableImpl> tableAsyncInternal(int id, boolean
checkConfiguration) {
- CompletableFuture<Boolean> tblCfgFut = checkConfiguration
- ? supplyAsync(() -> inBusyLock(busyLock, () ->
isTableConfigured(id)), ioExecutor)
- : completedFuture(true);
-
- return tblCfgFut.thenCompose(isCfg -> inBusyLock(busyLock, () -> {
- if (!isCfg) {
- return completedFuture(null);
- }
-
- TableImpl tbl = latestTablesById().get(id);
+ private CompletableFuture<TableImpl> tableAsyncInternalBusy(int tableId) {
+ TableImpl tableImpl = latestTablesById().get(tableId);
- if (tbl != null) {
- return completedFuture(tbl);
- }
+ if (tableImpl != null) {
+ return completedFuture(tableImpl);
+ }
- CompletableFuture<TableImpl> getTblFut = new CompletableFuture<>();
+ CompletableFuture<TableImpl> getLatestTableFuture = new
CompletableFuture<>();
- CompletionListener<Void> tablesListener = (token, v, th) -> {
- if (th == null) {
- CompletableFuture<Map<Integer, TableImpl>> tablesFut =
tablesByIdVv.get(token);
+ CompletionListener<Void> tablesListener = (token, v, th) -> {
+ if (th == null) {
+ CompletableFuture<Map<Integer, TableImpl>> tablesFuture =
tablesByIdVv.get(token);
- tablesFut.whenComplete((tables, e) -> {
- if (e != null) {
- getTblFut.completeExceptionally(e);
- } else {
- TableImpl table = tables.get(id);
+ tablesFuture.whenComplete((tables, e) -> {
+ if (e != null) {
+ getLatestTableFuture.completeExceptionally(e);
+ } else {
+ TableImpl table = tables.get(tableId);
- if (table != null) {
- getTblFut.complete(table);
- }
+ if (table != null) {
+ getLatestTableFuture.complete(table);
}
- });
- } else {
- getTblFut.completeExceptionally(th);
- }
- };
-
- assignmentsUpdatedVv.whenComplete(tablesListener);
+ }
+ });
+ } else {
+ getLatestTableFuture.completeExceptionally(th);
+ }
+ };
- // This check is needed for the case when we have registered
tablesListener,
- // but tablesByIdVv has already been completed, so listener would
be triggered only for the next versioned value update.
- tbl = latestTablesById().get(id);
+ assignmentsUpdatedVv.whenComplete(tablesListener);
- if (tbl != null) {
- assignmentsUpdatedVv.removeWhenComplete(tablesListener);
+ // This check is needed for the case when we have registered
tablesListener,
+ // but tablesByIdVv has already been completed, so listener would be
triggered only for the next versioned value update.
+ tableImpl = latestTablesById().get(tableId);
- return completedFuture(tbl);
- }
+ if (tableImpl != null) {
+ assignmentsUpdatedVv.removeWhenComplete(tablesListener);
- return getTblFut.whenComplete((unused, throwable) ->
assignmentsUpdatedVv.removeWhenComplete(tablesListener));
- }));
- }
+ return completedFuture(tableImpl);
+ }
- /**
- * Checks that the table is configured with specific id.
- *
- * @param id Table id.
- * @return True when the table is configured into cluster, false otherwise.
- */
- private boolean isTableConfigured(int id) {
- return configuredTablesCache.isTableConfigured(id);
+ return anyOf(getLatestTableFuture, stopManagerFuture)
+ .thenComposeAsync(o -> getLatestTableFuture, ioExecutor)
Review Comment:
> what's the point of using thenComposeAsync instead of thenCompose here?
I thought that it would be better when one of the futures is completed (from
`anyOf`), the subsequent processing of the futures continued to be performed in
the inner pool, so as not to block the threads that complete the futures, but
maybe I'm really bothering, wdyt?
> Also, what's the point of anyOf above, which uses the same future, if we
still need to wait for it?
I will try to explain: if we `busyLock#enterBusy` somewhere and do not
release it for some time, then in order to avoid blocking on the
`TableManager#stop`, we will simply immediately complete the future with an
error.
We discussed the decision with @sanpwc and decided to keep it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]