This is an automated email from the ASF dual-hosted git repository. sk0x50 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 26051c4 IGNITE-14238 Creating and destroying tables. Fixes #112 26051c4 is described below commit 26051c4bb693b17e959214b403de14e474b5d306 Author: Vladislav Pyatkov <vldpyat...@gmail.com> AuthorDate: Tue May 4 12:46:38 2021 +0300 IGNITE-14238 Creating and destroying tables. Fixes #112 Signed-off-by: Slava Koptilin <slava.kopti...@gmail.com> --- .../ignite/internal/affinity/AffinityManager.java | 7 +- .../apache/ignite/table/manager/IgniteTables.java | 7 + .../org/apache/ignite/internal/manager/Event.java | 25 +++ .../ignite/internal/manager/EventParameters.java | 26 +++ .../apache/ignite/internal/manager/Producer.java | 69 ++++++ .../apache/ignite/internal/util/ArrayUtils.java | 5 - .../java/org/apache/ignite/internal/raft/Loza.java | 18 +- .../ignite/internal/table/InternalTable.java | 8 + .../apache/ignite/internal/table/TableImpl.java | 18 ++ .../ignite/internal/table/TableSchemaViewImpl.java | 52 +++++ .../internal/table/distributed/TableManager.java | 239 +++++++++++++-------- .../distributed/storage/InternalTableImpl.java | 13 ++ .../ignite/internal/table/event/TableEvent.java | 31 +++ .../internal/table/event/TableEventParameters.java | 95 ++++++++ .../ignite/table/impl/DummyInternalTableImpl.java | 6 + 15 files changed, 522 insertions(+), 97 deletions(-) diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java index 937af33..82df3f8 100644 --- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java +++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java @@ -27,7 +27,6 @@ import org.apache.ignite.configuration.schemas.runner.NodeConfiguration; import org.apache.ignite.configuration.schemas.table.TablesConfiguration; import org.apache.ignite.internal.baseline.BaselineManager; import org.apache.ignite.internal.metastorage.MetaStorageManager; -import org.apache.ignite.internal.util.ArrayUtils; import org.apache.ignite.internal.util.ByteUtils; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.lang.ByteArray; @@ -134,7 +133,9 @@ public class AffinityManager { affinityCalculateSubscriptionFut = metaStorageMgr.registerWatchByPrefix(new Key(tableInternalPrefix), new WatchListener() { @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) { for (WatchEvent evt : events) { - if (ArrayUtils.empty(evt.newEntry().value())) { + byte[] assignmentVal = evt.newEntry().value(); + + if (assignmentVal != null && assignmentVal.length == 0) { String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length()); String placeholderValue = keyTail.substring(0, keyTail.indexOf('.')); @@ -150,7 +151,7 @@ public class AffinityManager { .tables().get(name).replicas().value(); metaStorageMgr.invoke(evt.newEntry().key(), - Conditions.value().eq(evt.newEntry().value()), + Conditions.value().eq(assignmentVal), Operations.put(ByteUtils.toBytes( RendezvousAffinityFunction.assignPartitions( baselineMgr.nodes(), diff --git a/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java b/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java index 24b20f6..780e9f1 100644 --- a/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java +++ b/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java @@ -40,6 +40,13 @@ public interface IgniteTables { Table createTable(String name, Consumer<TableChange> tableInitChange); /** + * Drops a table with the name specified. + * + * @param name Table name. + */ + void dropTable(String name); + + /** * Gets a list of all started tables. * * @return List of tables. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java new file mode 100644 index 0000000..c245b5b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.manager; + +/** + * The event cas whcih is produced by event producer component. + * @see Producer#onEvent(Event, EventParameters, Exception) + */ +public interface Event { +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java new file mode 100644 index 0000000..140047d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.manager; + +/** + * Event parameters. + * This type passed to the event listener. + * @see Producer#onEvent(Event, EventParameters, Exception) + */ +public interface EventParameters { +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java new file mode 100644 index 0000000..c198bba --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.manager; + +import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.BiPredicate; + +/** + * Interface which can produce its events. + */ +public abstract class Producer<T extends Event, P extends EventParameters> { + /** All listeners. */ + private ConcurrentHashMap<T, ConcurrentLinkedQueue<BiPredicate<P, Exception>>> listeners = new ConcurrentHashMap<>(); + + /** + * Registers an event listener. + * When the event predicate returns true it would never invoke after, + * otherwise this predicate would receive an event again. + * + * @param evt Event. + * @param closure Closure. + */ + public void listen(T evt, BiPredicate<P, Exception> closure) { + listeners.computeIfAbsent(evt, evtKey -> new ConcurrentLinkedQueue<>()) + .offer(closure); + } + + /** + * Notifies every listener that subscribed before. + * + * @param evt Event type. + * @param params Event parameters. + * @param err Exception when it was happened, or {@code null} otherwise. + */ + protected void onEvent(T evt, P params, Exception err) { + ConcurrentLinkedQueue<BiPredicate<P, Exception>> queue = listeners.get(evt); + + if (queue == null) + return; + + BiPredicate<P, Exception> closure; + + Iterator<BiPredicate<P, Exception>> iter = queue.iterator(); + + while (iter.hasNext()) { + closure = iter.next(); + + if (closure.test(params, err)) + iter.remove(); + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java index dcbb3d4..0457241 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java @@ -179,11 +179,6 @@ public final class ArrayUtils { } }; - /** */ - public static boolean empty(byte[] arr) { - return arr == null || arr.length == 0; - } - /** * Stub. */ diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java index 3d2ce2c..3444091 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java @@ -70,9 +70,10 @@ public class Loza { * @return A RAFT group client. */ public RaftGroupService startRaftGroup(String groupId, List<ClusterNode> peers, RaftGroupCommandListener lsnr) { - assert peers.size() > 1; + assert !peers.isEmpty(); //Now we are using only one node in a raft group. + //TODO: IGNITE-13885 Investigate jraft implementation for replication framework based on RAFT protocol. if (peers.get(0).name().equals(clusterNetSvc.topologyService().localMember().name())) raftServer.setListener(groupId, lsnr); @@ -86,4 +87,19 @@ public class Loza { DELAY ); } + + /** + * Stops a RAFT group. + * + * @param groupId RAFT group id. + * @param peers Group peers. + */ + public void stopRaftGroup(String groupId, List<ClusterNode> peers) { + assert !peers.isEmpty(); + + //Now we are using only one node in a raft group. + //TODO: IGNITE-13885 Investigate jraft implementation for replication framework based on RAFT protocol. + if (peers.get(0).name().equals(clusterNetSvc.topologyService().localMember().name())) + raftServer.clearListener(groupId); + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java index 1bddfed..4134e33 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.table; import java.util.Collection; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.schema.BinaryRow; import org.jetbrains.annotations.NotNull; @@ -28,6 +29,13 @@ import org.jetbrains.annotations.NotNull; */ public interface InternalTable { /** + * Gets a table id. + * + * @return Table id as UUID. + */ + @NotNull UUID tableId(); + + /** * Asynchronously gets a row with same key columns values as given one from the table. * * @param keyRow Row with key columns set. diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java index cc70462..3fd11de 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java @@ -57,6 +57,24 @@ public class TableImpl extends AbstractTableView implements Table { marsh = new TupleMarshallerImpl(schemaMgr); } + /** + * Gets an internal table associated with the table. + * + * @return Internal table. + */ + public @NotNull InternalTable internalTable() { + return tbl; + } + + /** + * Gets a schema view for the table. + * + * @return Schema view. + */ + public TableSchemaView schemaView() { + return schemaMgr; + } + /** {@inheritDoc} */ @Override public <R> RecordView<R> recordView(RecordMapper<R> recMapper) { return new RecordViewImpl<>(tbl, schemaMgr, recMapper); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaViewImpl.java new file mode 100644 index 0000000..dd74868 --- /dev/null +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaViewImpl.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table; + +import java.util.UUID; +import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.schema.SchemaManager; + +/** + * Schema view implementation. + */ +public class TableSchemaViewImpl implements TableSchemaView { + /** Table identifier. */ + private final UUID tableId; + + /** Schema manager. */ + private final SchemaManager schemaManager; + + /** + * @param tableId Table identifier. + * @param schemaManager Schema manager. + */ + public TableSchemaViewImpl(UUID tableId, SchemaManager schemaManager) { + this.tableId = tableId; + this.schemaManager = schemaManager; + } + + /** {@inheritDoc} */ + @Override public SchemaDescriptor schema() { + return schemaManager.schema(tableId); + } + + /** {@inheritDoc} */ + @Override public SchemaDescriptor schema(int ver) { + return schemaManager.schema(tableId, ver); + } +} diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 86a04e4..1d65043 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -19,13 +19,16 @@ package org.apache.ignite.internal.table.distributed; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.function.BiPredicate; import java.util.function.Consumer; import org.apache.ignite.configuration.internal.ConfigurationManager; import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration; @@ -33,15 +36,16 @@ import org.apache.ignite.configuration.schemas.runner.NodeConfiguration; import org.apache.ignite.configuration.schemas.table.TableChange; import org.apache.ignite.configuration.schemas.table.TableView; import org.apache.ignite.configuration.schemas.table.TablesConfiguration; +import org.apache.ignite.internal.manager.Producer; import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.raft.Loza; -import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.schema.SchemaManager; import org.apache.ignite.internal.table.TableImpl; -import org.apache.ignite.internal.table.TableSchemaView; +import org.apache.ignite.internal.table.TableSchemaViewImpl; import org.apache.ignite.internal.table.distributed.raft.PartitionCommandListener; import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; -import org.apache.ignite.internal.util.ArrayUtils; +import org.apache.ignite.internal.table.event.TableEvent; +import org.apache.ignite.internal.table.event.TableEventParameters; import org.apache.ignite.internal.util.ByteUtils; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.lang.ByteArray; @@ -60,7 +64,7 @@ import org.jetbrains.annotations.NotNull; /** * Table manager. */ -public class TableManager implements IgniteTables { +public class TableManager extends Producer<TableEvent, TableEventParameters> implements IgniteTables { /** The logger. */ private static final IgniteLogger LOG = IgniteLogger.forClass(TableManager.class); @@ -77,13 +81,14 @@ public class TableManager implements IgniteTables { private CompletableFuture<Long> tableCreationSubscriptionFut; /** Tables. */ - private Map<String, Table> tables; + private Map<String, TableImpl> tables = new ConcurrentHashMap<>(); - /** + /* * @param configurationMgr Configuration manager. * @param metaStorageMgr Meta storage manager. * @param schemaManager Schema manager. * @param raftMgr Raft manager. + * @param vaultManager Vault manager. */ public TableManager( ConfigurationManager configurationMgr, @@ -92,8 +97,6 @@ public class TableManager implements IgniteTables { Loza raftMgr, VaultManager vaultManager ) { - tables = new HashMap<>(); - this.configurationMgr = configurationMgr; this.metaStorageMgr = metaStorageMgr; @@ -123,52 +126,57 @@ public class TableManager implements IgniteTables { tableCreationSubscriptionFut = metaStorageMgr.registerWatchByPrefix(new Key(tableInternalPrefix), new WatchListener() { @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) { for (WatchEvent evt : events) { - if (!ArrayUtils.empty(evt.newEntry().value())) { - String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length()); - - String placeholderValue = keyTail.substring(0, keyTail.indexOf('.')); - - UUID tblId = UUID.fromString(placeholderValue); - - try { - String name = new String(vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX + tblId.toString())).get().value(), StandardCharsets.UTF_8); - - int partitions = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY) - .tables().get(name).partitions().value(); - - List<List<ClusterNode>> assignment = (List<List<ClusterNode>>)ByteUtils.fromBytes( - evt.newEntry().value()); - - HashMap<Integer, RaftGroupService> partitionMap = new HashMap<>(partitions); - - for (int p = 0; p < partitions; p++) { - partitionMap.put(p, raftMgr.startRaftGroup( - name + "_part_" + p, - assignment.get(p), - new PartitionCommandListener() - )); - } - - tables.put(name, new TableImpl( - new InternalTableImpl( - tblId, - partitionMap, - partitions - ), - new TableSchemaView() { - @Override public SchemaDescriptor schema() { - return schemaManager.schema(tblId); - } - - @Override public SchemaDescriptor schema(int ver) { - return schemaManager.schema(tblId, ver); - } - })); - } - catch (InterruptedException | ExecutionException e) { - LOG.error("Failed to start table [key={}]", - evt.newEntry().key(), e); + String placeholderValue = evt.newEntry().key().toString().substring(tableInternalPrefix.length() - 1); + + String name = new String(vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX + placeholderValue)) + .join().value(), StandardCharsets.UTF_8); + + UUID tblId = UUID.fromString(placeholderValue); + + if (evt.newEntry().value() == null) { + assert evt.oldEntry().value() != null : "Previous assignment is unknown"; + + List<List<ClusterNode>> assignment = (List<List<ClusterNode>>)ByteUtils.fromBytes( + evt.oldEntry().value()); + + int partitions = assignment.size(); + + for (int p = 0; p < partitions; p++) + raftMgr.stopRaftGroup(raftGroupName(tblId, p), assignment.get(p)); + + TableImpl table = tables.get(name); + + assert table != null : "There is no table with the name specified [name=" + name + ']'; + + onEvent(TableEvent.DROP, new TableEventParameters( + tblId, + name, + table.schemaView(), + table.internalTable() + ), null); + } + else if (evt.newEntry().value().length > 0) { + List<List<ClusterNode>> assignment = (List<List<ClusterNode>>)ByteUtils.fromBytes( + evt.newEntry().value()); + + int partitions = assignment.size(); + + HashMap<Integer, RaftGroupService> partitionMap = new HashMap<>(partitions); + + for (int p = 0; p < partitions; p++) { + partitionMap.put(p, raftMgr.startRaftGroup( + raftGroupName(tblId, p), + assignment.get(p), + new PartitionCommandListener() + )); } + + onEvent(TableEvent.CREATE, new TableEventParameters( + tblId, + name, + new TableSchemaViewImpl(tblId, schemaManager), + new InternalTableImpl(tblId, partitionMap, partitions) + ), null); } } @@ -182,6 +190,17 @@ public class TableManager implements IgniteTables { } /** + * Compounds a RAFT group unique name. + * + * @param tableId Table identifier. + * @param partition Muber of table partition. + * @return A RAFT group name. + */ + @NotNull private String raftGroupName(UUID tableId, int partition) { + return tableId + "_part_" + partition; + } + + /** * Checks whether the local node hosts Metastorage. * * @param localNodeName Local node uniq name. @@ -208,39 +227,52 @@ public class TableManager implements IgniteTables { //TODO: IGNITE-14652 Change a metastorage update in listeners to multi-invoke configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY) .tables().listen(ctx -> { - HashSet<String> tblNamesToStart = new HashSet<>(ctx.newValue().namedListKeys()); + Set<String> tablesToStart = ctx.newValue().namedListKeys() == null ? + Collections.EMPTY_SET : ctx.newValue().namedListKeys(); + + tablesToStart.removeAll(ctx.oldValue().namedListKeys()); long revision = ctx.storageRevision(); - if (ctx.oldValue() != null) - tblNamesToStart.removeAll(ctx.oldValue().namedListKeys()); + List<CompletableFuture<Boolean>> futs = new ArrayList<>(); - for (String tblName : tblNamesToStart) { + for (String tblName : tablesToStart) { TableView tableView = ctx.newValue().get(tblName); long update = 0; UUID tblId = new UUID(revision, update); - CompletableFuture<Boolean> fut = metaStorageMgr.invoke( + futs.add(metaStorageMgr.invoke( new Key(INTERNAL_PREFIX + tblId.toString()), Conditions.value().eq(null), Operations.put(tableView.name().getBytes(StandardCharsets.UTF_8)), - Operations.noop()); + Operations.noop()).thenCompose(res -> + res ? metaStorageMgr.put(new Key(INTERNAL_PREFIX + "assignment." + tblId.toString()), new byte[0]) + .thenApply(v -> true) + : CompletableFuture.completedFuture(false))); + } - try { - if (fut.get()) { - metaStorageMgr.put(new Key(INTERNAL_PREFIX + "assignment." + tblId.toString()), new byte[0]); + Set<String> tablesToStop = ctx.oldValue().namedListKeys() == null ? + Collections.EMPTY_SET : ctx.oldValue().namedListKeys(); - LOG.info("Table manager created a table [name={}, revision={}]", - tableView.name(), revision); - } - } - catch (InterruptedException | ExecutionException e) { - LOG.error("Table was not fully initialized [name={}, revision={}]", - tableView.name(), revision, e); - } + tablesToStop.removeAll(ctx.newValue().namedListKeys()); + + for (String tblName : tablesToStop) { + TableImpl t = tables.get(tblName); + + UUID tblId = t.internalTable().tableId(); + + futs.add(metaStorageMgr.invoke( + new Key(INTERNAL_PREFIX + "assignment." + tblId.toString()), + Conditions.value().ne(null), + Operations.remove(), + Operations.noop()).thenCompose(res -> + res ? metaStorageMgr.remove(new Key(INTERNAL_PREFIX + tblId.toString())) + .thenApply(v -> true) + : CompletableFuture.completedFuture(false))); } - return CompletableFuture.completedFuture(null); + + return CompletableFuture.allOf(futs.toArray(CompletableFuture[]::new)); }); } @@ -265,31 +297,62 @@ public class TableManager implements IgniteTables { /** {@inheritDoc} */ @Override public Table createTable(String name, Consumer<TableChange> tableInitChange) { + CompletableFuture<Table> tblFut = new CompletableFuture<>(); + + listen(TableEvent.CREATE, (params, e) -> { + String tableName = params.tableName(); + + if (!name.equals(tableName)) + return false; + + if (e == null) { + tblFut.complete(tables.compute(tableName, (key, val) -> + new TableImpl(params.internalTable(), params.tableSchemaView()))); + } + else + tblFut.completeExceptionally(e); + + return true; + }); + configurationMgr.configurationRegistry() .getConfiguration(TablesConfiguration.KEY).tables().change(change -> change.create(name, tableInitChange)); -// this.createTable("tbl1", change -> { -// change.initReplicas(2); -// change.initName("tbl1"); -// change.initPartitions(1_000); -// }); + return tblFut.join(); + } - //TODO: IGNITE-14646 Support asynchronous table creation - Table tbl = null; + /** {@inheritDoc} */ + @Override public void dropTable(String name) { + CompletableFuture<Void> dropTblFut = new CompletableFuture<>(); - while (tbl == null) { - try { - Thread.sleep(50); + listen(TableEvent.DROP, new BiPredicate<TableEventParameters, Exception>() { + @Override public boolean test(TableEventParameters params, Exception e) { + String tableName = params.tableName(); - tbl = table(name); - } - catch (InterruptedException e) { - LOG.error("Waiting of creation of table was interrupted.", e); + if (!name.equals(tableName)) + return false; + + if (e == null) { + Table droppedTable = tables.remove(tableName); + + assert droppedTable != null; + + dropTblFut.complete(null); + } + else + dropTblFut.completeExceptionally(e); + + return true; } - } + }); + + configurationMgr.configurationRegistry() + .getConfiguration(TablesConfiguration.KEY).tables().change(change -> { + change.delete(name); + }); - return tbl; + dropTblFut.join(); } /** {@inheritDoc} */ diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java index 2ff5e8b..c7dcfa3 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java @@ -42,6 +42,9 @@ public class InternalTableImpl implements InternalTable { /** Partitions. */ private int partitions; + /** Table identifier. */ + private UUID tableId; + /** * @param tableId Table id. * @param partMap Map partition id to raft group. @@ -52,10 +55,20 @@ public class InternalTableImpl implements InternalTable { Map<Integer, RaftGroupService> partMap, int partitions ) { + this.tableId = tableId; this.partitionMap = partMap; this.partitions = partitions; } + /** + * Gets a table id. + * + * @return Table id as UUID. + */ + @Override public @NotNull UUID tableId() { + return tableId; + } + /** {@inheritDoc} */ @Override public @NotNull CompletableFuture<BinaryRow> get(BinaryRow keyRow) { return partitionMap.get(keyRow.hash() % partitions).<KVGetResponse>run(new GetCommand(keyRow)) diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEvent.java b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEvent.java new file mode 100644 index 0000000..f2a737e --- /dev/null +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.event; + +import org.apache.ignite.internal.manager.Event; + +/** + * Table management events. + */ +public enum TableEvent implements Event { + /** This event is fired when a table was created. */ + CREATE, + + /** This event is fired when a table was dropped. */ + DROP +} diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java new file mode 100644 index 0000000..853617d --- /dev/null +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.event; + +import java.util.UUID; +import org.apache.ignite.internal.manager.EventParameters; +import org.apache.ignite.internal.table.InternalTable; +import org.apache.ignite.internal.table.TableSchemaView; + +/** + * Table event parameters. + * There are properties which associate with a concrete table. + */ +public class TableEventParameters implements EventParameters { + /** Table identifier. */ + private final UUID tableId; + + /** Table name. */ + private final String tableName; + + /** Table schema view. */ + private final TableSchemaView tableSchemaView; + + /** Internal table. */ + private final InternalTable internalTable; + + /** + * @param tableId Table identifier. + * @param tableName Table name. + * @param tableSchemaView Table schema view. + * @param internalTable Internal table. + */ + public TableEventParameters( + UUID tableId, + String tableName, + TableSchemaView tableSchemaView, + InternalTable internalTable + ) { + this.tableId = tableId; + this.tableName = tableName; + this.tableSchemaView = tableSchemaView; + this.internalTable = internalTable; + } + + /** + * Get the table identifier. + * + * @return Table id. + */ + public UUID tableId() { + return tableId; + } + + /** + * Gets the table name. + * + * @return Table name. + */ + public String tableName() { + return tableName; + } + + /** + * Gets a schema view for the table. + * + * @return Schema descriptor. + */ + public TableSchemaView tableSchemaView() { + return tableSchemaView; + } + + /** + * Gets an internal table associated with the table. + * + * @return Internal table. + */ + public InternalTable internalTable() { + return internalTable; + } +} diff --git a/modules/table/src/test/java/org/apache/ignite/table/impl/DummyInternalTableImpl.java b/modules/table/src/test/java/org/apache/ignite/table/impl/DummyInternalTableImpl.java index 5e9a352..23d46c4 100644 --- a/modules/table/src/test/java/org/apache/ignite/table/impl/DummyInternalTableImpl.java +++ b/modules/table/src/test/java/org/apache/ignite/table/impl/DummyInternalTableImpl.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -77,6 +78,11 @@ public class DummyInternalTableImpl implements InternalTable { } /** {@inheritDoc} */ + @Override public @NotNull UUID tableId() { + return UUID.randomUUID(); + } + + /** {@inheritDoc} */ @Override public CompletableFuture<BinaryRow> get(@NotNull BinaryRow row) { assert row != null;