This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new c9a1ab9 IGNITE-14864: Schema update. Merge multiple converters stages. (#194) c9a1ab9 is described below commit c9a1ab902045fda232c0caac89f4f682a62e5e24 Author: Andrew V. Mashenkov <amashen...@users.noreply.github.com> AuthorDate: Fri Jul 16 14:16:27 2021 +0300 IGNITE-14864: Schema update. Merge multiple converters stages. (#194) --- .../runner/app/AbstractSchemaChangeTest.java | 123 +++++++++++- .../runner/app/DynamicTableCreationTest.java | 67 +++---- .../runner/app/SchemaChangeKVViewTest.java | 137 +++++++++++++ .../runner/app/SchemaChangeTableViewTest.java | 174 +++++++++++++++++ .../internal/runner/app/TableCreationTest.java | 212 ++++++++++----------- .../apache/ignite/internal/schema/NativeTypes.java | 1 + .../ignite/internal/schema/SchemaManager.java | 38 ---- .../schema/builder/SchemaTableBuilderImpl.java | 3 + .../internal/schema/mapping/ColumnMapping.java | 33 ++++ .../schema/registry/SchemaRegistryImpl.java | 54 +++++- .../schema/registry/UpgradingRowAdapter.java | 6 +- .../{ => registry}/SchemaRegistryImplTest.java | 66 ++++++- .../internal/table/distributed/TableManager.java | 2 +- 13 files changed, 720 insertions(+), 196 deletions(-) diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java index 3788b5a..8070888 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java @@ -22,8 +22,10 @@ import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.function.Supplier; import org.apache.ignite.app.Ignite; import org.apache.ignite.app.IgnitionManager; +import org.apache.ignite.internal.schema.InvalidTypeException; import org.apache.ignite.internal.testframework.WorkDirectory; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; import org.apache.ignite.internal.util.IgniteUtils; @@ -33,6 +35,9 @@ import org.apache.ignite.schema.SchemaBuilders; import org.apache.ignite.schema.SchemaTable; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import static org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter.convert; @@ -94,6 +99,90 @@ abstract class AbstractSchemaChangeTest { } /** + * Check unsupported column type change. + */ + @Disabled("https://issues.apache.org/jira/browse/IGNITE-15056") + @Test + public void testChangeColumnType() { + List<Ignite> grid = startGrid(); + + createTable(grid); + + Assertions.assertThrows(InvalidTypeException.class, () -> { + grid.get(0).tables().alterTable(TABLE, + tblChanger -> tblChanger.changeColumns(cols -> { + final String colKey = tblChanger.columns().namedListKeys().stream() + .filter(c -> "valInt".equals(tblChanger.columns().get(c).name())) + .findFirst() + .orElseThrow(() -> { + throw new IllegalStateException("Column not found."); + }); + + tblChanger.changeColumns(listChanger -> + listChanger.update(colKey, colChanger -> colChanger.changeType(c -> c.changeType("STRING"))) + ); + }) + ); + }); + } + + /** + * Check unsupported column nullability change. + */ + @Disabled("https://issues.apache.org/jira/browse/IGNITE-15056") + @Test + public void testMakeColumnNonNullable() { + List<Ignite> grid = startGrid(); + + createTable(grid); + + Assertions.assertThrows(InvalidTypeException.class, () -> { + grid.get(0).tables().alterTable(TABLE, + tblChanger -> tblChanger.changeColumns(cols -> { + final String colKey = tblChanger.columns().namedListKeys().stream() + .filter(c -> "valInt".equals(tblChanger.columns().get(c).name())) + .findFirst() + .orElseThrow(() -> { + throw new IllegalStateException("Column not found."); + }); + + tblChanger.changeColumns(listChanger -> + listChanger.update(colKey, colChanger -> colChanger.changeNullable(false)) + ); + }) + ); + }); + } + + /** + * Check unsupported nullability change. + */ + @Disabled("https://issues.apache.org/jira/browse/IGNITE-15056") + @Test + public void testMakeColumnsNullable() { + List<Ignite> grid = startGrid(); + + createTable(grid); + + Assertions.assertThrows(InvalidTypeException.class, () -> { + grid.get(0).tables().alterTable(TABLE, + tblChanger -> tblChanger.changeColumns(cols -> { + final String colKey = tblChanger.columns().namedListKeys().stream() + .filter(c -> "valStr".equals(tblChanger.columns().get(c).name())) + .findFirst() + .orElseThrow(() -> { + throw new IllegalStateException("Column not found."); + }); + + tblChanger.changeColumns(listChanger -> + listChanger.update(colKey, colChanger -> colChanger.changeNullable(true)) + ); + }) + ); + }); + } + + /** * @return Grid nodes. */ @NotNull protected List<Ignite> startGrid() { @@ -131,7 +220,8 @@ abstract class AbstractSchemaChangeTest { int colIdx = chng.columns().namedListKeys().stream().mapToInt(Integer::parseInt).max().getAsInt() + 1; cols.create(String.valueOf(colIdx), colChg -> convert(columnToAdd, colChg)); - })); + }) + ); } /** @@ -146,8 +236,10 @@ abstract class AbstractSchemaChangeTest { .findAny() .orElseThrow(() -> { throw new IllegalStateException("Column not found."); - })); - })); + }) + ); + }) + ); } /** @@ -168,6 +260,29 @@ abstract class AbstractSchemaChangeTest { tblChanger.changeColumns(listChanger -> listChanger.update(colKey, colChanger -> colChanger.changeName(newName)) ); - })); + }) + ); + } + + /** + * @param nodes Cluster nodes. + * @param colName Column name. + * @param defSup Default value supplier. + */ + protected void changeDefault(List<Ignite> nodes, String colName, Supplier<Object> defSup) { + nodes.get(0).tables().alterTable(TABLE, + tblChanger -> tblChanger.changeColumns(cols -> { + final String colKey = tblChanger.columns().namedListKeys().stream() + .filter(c -> colName.equals(tblChanger.columns().get(c).name())) + .findFirst() + .orElseThrow(() -> { + throw new IllegalStateException("Column not found."); + }); + + tblChanger.changeColumns(listChanger -> + listChanger.update(colKey, colChanger -> colChanger.changeDefaultValue(defSup.get().toString())) + ); + }) + ); } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java index 884e7ff..575db70 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java @@ -25,13 +25,11 @@ import java.util.Map; import java.util.UUID; import org.apache.ignite.app.Ignite; import org.apache.ignite.app.IgnitionManager; -import org.apache.ignite.internal.schema.SchemaManager; import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter; +import org.apache.ignite.internal.table.SchemaMismatchException; import org.apache.ignite.internal.testframework.WorkDirectory; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; -import org.apache.ignite.internal.table.SchemaMismatchException; import org.apache.ignite.internal.util.IgniteUtils; -import org.apache.ignite.lang.IgniteLogger; import org.apache.ignite.schema.ColumnType; import org.apache.ignite.schema.SchemaBuilders; import org.apache.ignite.schema.SchemaTable; @@ -53,41 +51,38 @@ import static org.junit.jupiter.api.Assertions.assertThrows; @Disabled("https://issues.apache.org/jira/browse/IGNITE-14581") @ExtendWith(WorkDirectoryExtension.class) class DynamicTableCreationTest { - /** The logger. */ - private static final IgniteLogger LOG = IgniteLogger.forClass(SchemaManager.class); - /** Nodes bootstrap configuration. */ private final Map<String, String> nodesBootstrapCfg = new LinkedHashMap<>() {{ - put("node0", "{\n" + - " \"node\": {\n" + - " \"metastorageNodes\":[ \"node0\" ]\n" + - " },\n" + - " \"network\": {\n" + - " \"port\":3344,\n" + - " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n" + - " }\n" + - "}"); - - put("node1", "{\n" + - " \"node\": {\n" + - " \"metastorageNodes\":[ \"node0\" ]\n" + - " },\n" + - " \"network\": {\n" + - " \"port\":3345,\n" + - " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n" + - " }\n" + - "}"); - - put("node2", "{\n" + - " \"node\": {\n" + - " \"metastorageNodes\":[ \"node0\"]\n" + - " },\n" + - " \"network\": {\n" + - " \"port\":3346,\n" + - " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n" + - " }\n" + - "}"); - }}; + put("node0", "{\n" + + " \"node\": {\n" + + " \"metastorageNodes\":[ \"node0\" ]\n" + + " },\n" + + " \"network\": {\n" + + " \"port\":3344,\n" + + " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n" + + " }\n" + + "}"); + + put("node1", "{\n" + + " \"node\": {\n" + + " \"metastorageNodes\":[ \"node0\" ]\n" + + " },\n" + + " \"network\": {\n" + + " \"port\":3345,\n" + + " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n" + + " }\n" + + "}"); + + put("node2", "{\n" + + " \"node\": {\n" + + " \"metastorageNodes\":[ \"node0\"]\n" + + " },\n" + + " \"network\": {\n" + + " \"port\":3346,\n" + + " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n" + + " }\n" + + "}"); + }}; /** */ private final List<Ignite> clusterNodes = new ArrayList<>(); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeKVViewTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeKVViewTest.java index 944802d..977914b 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeKVViewTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeKVViewTest.java @@ -17,9 +17,12 @@ package org.apache.ignite.internal.runner.app; +import java.io.Serializable; import java.util.List; +import java.util.function.Supplier; import org.apache.ignite.app.Ignite; import org.apache.ignite.internal.table.ColumnNotFoundException; +import org.apache.ignite.schema.Column; import org.apache.ignite.schema.ColumnType; import org.apache.ignite.schema.SchemaBuilders; import org.apache.ignite.table.KeyValueBinaryView; @@ -166,4 +169,138 @@ class SchemaChangeKVViewTest extends AbstractSchemaChangeTest { assertThrows(ColumnNotFoundException.class, () -> kvView.get(keyTuple2).value("valInt")); } } + + /** + * Check merge table schema changes. + */ + @Test + public void testMergeChangesAddDropAdd() { + List<Ignite> grid = startGrid(); + + createTable(grid); + + final Column column = SchemaBuilders.column("val", ColumnType.string()).asNullable().withDefaultValue("default").build(); + + KeyValueBinaryView kvView = grid.get(1).tables().table(TABLE).kvView(); + + { + kvView.put(kvView.tupleBuilder().set("key", 1L).build(), + kvView.tupleBuilder().set("valInt", 111).build()); + + assertThrows(ColumnNotFoundException.class, () -> kvView.put( + kvView.tupleBuilder().set("key", 2L).build(), + kvView.tupleBuilder().set("val", "I'not exists").build()) + ); + } + + addColumn(grid, column); + + { + assertNull(kvView.get(kvView.tupleBuilder().set("key", 2L).build())); + + kvView.put(kvView.tupleBuilder().set("key", 2L).build(), + kvView.tupleBuilder().set("valInt", 222).set("val", "string").build()); + + kvView.put(kvView.tupleBuilder().set("key", 3L).build(), + kvView.tupleBuilder().set("valInt", 333).build()); + } + + dropColumn(grid, column.name()); + + { + kvView.put(kvView.tupleBuilder().set("key", 4L).build(), + kvView.tupleBuilder().set("valInt", 444).build()); + + assertThrows(ColumnNotFoundException.class, () -> kvView.put( + kvView.tupleBuilder().set("key", 4L).build(), + kvView.tupleBuilder().set("val", "I'm not exist").build()) + ); + } + + addColumn(grid, SchemaBuilders.column("val", ColumnType.string()).asNullable().withDefaultValue("default").build()); + + { + kvView.put(kvView.tupleBuilder().set("key", 5L).build(), + kvView.tupleBuilder().set("valInt", 555).build()); + + // Check old row conversion. + Tuple keyTuple1 = kvView.tupleBuilder().set("key", 1L).build(); + + assertEquals(111, (Integer)kvView.get(keyTuple1).value("valInt")); + assertEquals("default", kvView.get(keyTuple1).value("val")); + + Tuple keyTuple2 = kvView.tupleBuilder().set("key", 2L).build(); + + assertEquals(222, (Integer)kvView.get(keyTuple2).value("valInt")); + assertEquals("default", kvView.get(keyTuple2).value("val")); + + Tuple keyTuple3 = kvView.tupleBuilder().set("key", 3L).build(); + + assertEquals(333, (Integer)kvView.get(keyTuple3).value("valInt")); + assertEquals("default", kvView.get(keyTuple3).value("val")); + + Tuple keyTuple4 = kvView.tupleBuilder().set("key", 4L).build(); + + assertEquals(444, (Integer)kvView.get(keyTuple4).value("valInt")); + assertEquals("default", kvView.get(keyTuple4).value("val")); + + Tuple keyTuple5 = kvView.tupleBuilder().set("key", 5L).build(); + + assertEquals(555, (Integer)kvView.get(keyTuple5).value("valInt")); + assertEquals("default", kvView.get(keyTuple5).value("val")); + } + } + + + /** + * Check merge table schema changes. + */ + @Disabled("https://issues.apache.org/jira/browse/IGNITE-14896") + @Test + public void testMergeChangesColumnDefault() { + List<Ignite> grid = startGrid(); + + createTable(grid); + + KeyValueBinaryView vkView = grid.get(1).tables().table(TABLE).kvView(); + + final String colName = "valStr"; + + { + vkView.put(vkView.tupleBuilder().set("key", 1L).build(), vkView.tupleBuilder().set("valInt", 111).build()); + } + + changeDefault(grid, colName, (Supplier<Object> & Serializable)() -> "newDefault"); + addColumn(grid, SchemaBuilders.column("val", ColumnType.string()).withDefaultValue("newDefault").build()); + + { + vkView.put(vkView.tupleBuilder().set("key", 2L).build(), vkView.tupleBuilder().set("valInt", 222).build()); + } + + changeDefault(grid, colName, (Supplier<Object> & Serializable)() -> "brandNewDefault"); + changeDefault(grid, "val", (Supplier<Object> & Serializable)() -> "brandNewDefault"); + + { + vkView.put(vkView.tupleBuilder().set("key", 3L).build(), vkView.tupleBuilder().set("valInt", 333).build()); + + // Check old row conversion. + Tuple keyTuple1 = vkView.tupleBuilder().set("key", 1L).build(); + + assertEquals(111, (Integer)vkView.get(keyTuple1).value("valInt")); + assertEquals("default", vkView.get(keyTuple1).value("valStr")); + assertEquals("newDefault", vkView.get(keyTuple1).value("val")); + + Tuple keyTuple2 = vkView.tupleBuilder().set("key", 2L).build(); + + assertEquals(222, (Integer)vkView.get(keyTuple2).value("valInt")); + assertEquals("newDefault", vkView.get(keyTuple2).value("valStr")); + assertEquals("newDefault", vkView.get(keyTuple2).value("val")); + + Tuple keyTuple3 = vkView.tupleBuilder().set("key", 3L).build(); + + assertEquals(333, (Integer)vkView.get(keyTuple3).value("valInt")); + assertEquals("brandNewDefault", vkView.get(keyTuple3).value("valStr")); + assertEquals("brandNewDefault", vkView.get(keyTuple3).value("val")); + } + } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeTableViewTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeTableViewTest.java index 090ce72..d5836bd 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeTableViewTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeTableViewTest.java @@ -17,9 +17,12 @@ package org.apache.ignite.internal.runner.app; +import java.io.Serializable; import java.util.List; +import java.util.function.Supplier; import org.apache.ignite.app.Ignite; import org.apache.ignite.internal.table.ColumnNotFoundException; +import org.apache.ignite.schema.Column; import org.apache.ignite.schema.ColumnType; import org.apache.ignite.schema.SchemaBuilders; import org.apache.ignite.table.Table; @@ -28,6 +31,7 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; /** @@ -158,4 +162,174 @@ class SchemaChangeTableViewTest extends AbstractSchemaChangeTest { assertThrows(ColumnNotFoundException.class, () -> tbl.get(keyTuple2).value("valInt")); } } + + /** + * Rename column then add a new column with same name. + */ + @Test + void testRenameThenAddColumnWithSameName() { + List<Ignite> grid = startGrid(); + + createTable(grid); + + Table tbl = grid.get(0).tables().table(TABLE); + + { + tbl.insert(tbl.tupleBuilder().set("key", 1L).set("valInt", 111).build()); + + assertThrows(ColumnNotFoundException.class, + () -> tbl.insert(tbl.tupleBuilder().set("key", 2L).set("val2", -222).build()) + ); + } + + renameColumn(grid, "valInt", "val2"); + addColumn(grid, SchemaBuilders.column("valInt", ColumnType.INT32).asNullable().withDefaultValue(-1).build()); + + { + // Check old row conversion. + Tuple keyTuple1 = tbl.tupleBuilder().set("key", 1L).build(); + + assertEquals(1, (Long)tbl.get(keyTuple1).value("key")); + assertEquals(111, (Integer)tbl.get(keyTuple1).value("val2")); + assertEquals(-1, (Integer)tbl.get(keyTuple1).value("valInt")); + + // Check tuple of outdated schema. + assertNull(tbl.get(tbl.tupleBuilder().set("key", 2L).build())); + + // Check tuple of correct schema. + tbl.insert(tbl.tupleBuilder().set("key", 2L).set("val2", 222).build()); + + Tuple keyTuple2 = tbl.tupleBuilder().set("key", 2L).build(); + + assertEquals(2, (Long)tbl.get(keyTuple2).value("key")); + assertEquals(222, (Integer)tbl.get(keyTuple2).value("val2")); + assertEquals(-1, (Integer)tbl.get(keyTuple2).value("valInt")); + } + } + + /** + * Check merge table schema changes. + */ + @Test + public void testMergeChangesAddDropAdd() { + List<Ignite> grid = startGrid(); + + createTable(grid); + + final Column column = SchemaBuilders.column("val", ColumnType.string()).asNullable().withDefaultValue("default").build(); + + Table tbl = grid.get(0).tables().table(TABLE); + + { + tbl.insert(tbl.tupleBuilder().set("key", 1L).set("valInt", 111).build()); + + assertThrows(ColumnNotFoundException.class, () -> tbl.insert( + tbl.tupleBuilder().set("key", 2L).set("val", "I'not exists").build()) + ); + } + + addColumn(grid, column); + + { + assertNull(tbl.get(tbl.tupleBuilder().set("key", 2L).build())); + + tbl.insert(tbl.tupleBuilder().set("key", 2L).set("valInt", 222).set("val", "string").build()); + + tbl.insert(tbl.tupleBuilder().set("key", 3L).set("valInt", 333).build()); + } + + dropColumn(grid, column.name()); + + { + tbl.insert(tbl.tupleBuilder().set("key", 4L).set("valInt", 444).build()); + + assertThrows(ColumnNotFoundException.class, () -> tbl.insert( + tbl.tupleBuilder().set("key", 4L).set("val", "I'm not exist").build()) + ); + } + + addColumn(grid, SchemaBuilders.column("val", ColumnType.string()).withDefaultValue("default").build()); + + { + tbl.insert(tbl.tupleBuilder().set("key", 5L).set("valInt", 555).build()); + + // Check old row conversion. + Tuple keyTuple1 = tbl.tupleBuilder().set("key", 1L).build(); + + assertEquals(111, (Integer)tbl.get(keyTuple1).value("valInt")); + assertEquals("default", tbl.get(keyTuple1).value("val")); + + Tuple keyTuple2 = tbl.tupleBuilder().set("key", 2L).build(); + + assertEquals(222, (Integer)tbl.get(keyTuple2).value("valInt")); + assertEquals("default", tbl.get(keyTuple2).value("val")); + + Tuple keyTuple3 = tbl.tupleBuilder().set("key", 3L).build(); + + assertEquals(333, (Integer)tbl.get(keyTuple3).value("valInt")); + assertEquals("default", tbl.get(keyTuple3).value("val")); + + Tuple keyTuple4 = tbl.tupleBuilder().set("key", 4L).build(); + + assertEquals(444, (Integer)tbl.get(keyTuple4).value("valInt")); + assertEquals("default", tbl.get(keyTuple4).value("val")); + + Tuple keyTuple5 = tbl.tupleBuilder().set("key", 5L).build(); + + assertEquals(555, (Integer)tbl.get(keyTuple5).value("valInt")); + assertEquals("default", tbl.get(keyTuple5).value("val")); + } + } + + /** + * Check merge column default value changes. + */ + @Disabled("https://issues.apache.org/jira/browse/IGNITE-14896") + @Test + public void testMergeChangesColumnDefault() { + List<Ignite> grid = startGrid(); + + createTable(grid); + + Table tbl = grid.get(0).tables().table(TABLE); + + final String colName = "valStr"; + + { + tbl.insert(tbl.tupleBuilder().set("key", 1L).set("valInt", 111).build()); + } + + changeDefault(grid, colName, (Supplier<Object> & Serializable)() -> "newDefault"); + addColumn(grid, SchemaBuilders.column("val", ColumnType.string()).withDefaultValue("newDefault").build()); + + { + tbl.insert(tbl.tupleBuilder().set("key", 2L).set("valInt", 222).build()); + } + + changeDefault(grid, colName, (Supplier<Object> & Serializable)() -> "brandNewDefault"); + changeDefault(grid, "val", (Supplier<Object> & Serializable)() -> "brandNewDefault"); + + { + tbl.insert(tbl.tupleBuilder().set("key", 3L).set("valInt", 333).build()); + + // Check old row conversion. + Tuple keyTuple1 = tbl.tupleBuilder().set("key", 1L).build(); + + assertEquals(111, (Integer)tbl.get(keyTuple1).value("valInt")); + assertEquals("default", tbl.get(keyTuple1).value("valStr")); + assertEquals("newDefault", tbl.get(keyTuple1).value("val")); + + Tuple keyTuple2 = tbl.tupleBuilder().set("key", 2L).build(); + + assertEquals(222, (Integer)tbl.get(keyTuple2).value("valInt")); + assertEquals("newDefault", tbl.get(keyTuple2).value("valStr")); + assertEquals("newDefault", tbl.get(keyTuple2).value("val")); + + Tuple keyTuple3 = tbl.tupleBuilder().set("key", 3L).build(); + + assertEquals(333, (Integer)tbl.get(keyTuple3).value("valInt")); + assertEquals("brandNewDefault", tbl.get(keyTuple3).value("valStr")); + assertEquals("brandNewDefault", tbl.get(keyTuple3).value("val")); + } + } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/TableCreationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/TableCreationTest.java index cedb240..03b8b8d 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/TableCreationTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/TableCreationTest.java @@ -47,112 +47,112 @@ import static org.junit.jupiter.api.Assertions.assertNull; class TableCreationTest { /** Nodes bootstrap configuration with preconfigured tables. */ private final LinkedHashMap<String, String> nodesBootstrapCfg = new LinkedHashMap<>() {{ - put("node0", "{\n" + - " \"node\": {\n" + - " \"name\":node0,\n" + - " \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" + - " },\n" + - " \"network\": {\n" + - " \"port\":3344,\n" + - " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n" + - " },\n" + - " \"table\": {\n" + - " \"tables\": {\n" + - " \"tbl1\": {\n" + - " \"partitions\":10,\n" + - " \"replicas\":2,\n" + - " \"columns\": { \n" + - " \"key\": {\n" + - " \"type\": {" + - " \"type\":UUID\n" + - " },\n" + - " \"nullable\":false\n" + - " },\n" + - " \"affKey\": {\n" + - " \"type\": {" + - " \"type\":INT64\n" + - " },\n" + - " \"nullable\":false\n" + - " },\n" + - " \"valString\": {\n" + - " \"type\": {" + - " \"type\":String\n" + - " },\n" + - " \"nullable\":false\n" + - " },\n" + - " \"valInt\": {\n" + - " \"type\": {" + - " \"type\":INT32\n" + - " },\n" + - " \"nullable\":false\n" + - " },\n" + - " \"valNullable\": {\n" + - " \"type\": {" + - " \"type\":String\n" + - " },\n" + - " \"nullable\":true\n" + - " }\n" + - " },\n" + /* Columns. */ - " \"indices\": {\n" + - " \"PK\": {\n" + - " \"type\":PRIMARY,\n" + - " \"columns\": {\n" + - " \"key\": {\n" + - " \"asc\":true\n" + - " },\n" + - " \"affKey\": {}\n" + - " },\n" + /* Columns. */ - " \"affinityColumns\":[ \"affKey\" ]\n" + - " }\n" + - " }\n" + /* Indices. */ - " },\n" + /* Table. */ - "\n" + - " \"tbl2\": {\n" + // Table minimal configuration. - " \"columns\": { \n" + - " \"key\": {\n" + - " \"type\": {" + - " \"type\":INT64\n" + - " },\n" + - " },\n" + - " \"val\": {\n" + - " \"type\": {" + - " \"type\":INT64\n" + - " },\n" + - " }\n" + - " },\n" + /* Columns. */ - " \"indices\": {\n" + - " \"PK\": {\n" + - " \"type\":PRIMARY,\n" + - " \"columns\": {\n" + - " \"key\": {}\n" + - " },\n" + /* Columns. */ - " }\n" + - " }\n" + /* Indices. */ - " }\n" + /* Table. */ - " }\n" + /* Tables. */ - " }\n" + /* Root. */ - "}"); - - put("node1", "{\n" + - " \"node\": {\n" + - " \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" + - " },\n" + - " \"network\": {\n" + - " \"port\":3345,\n" + - " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n" + - " }\n" + - "}"); - - put("node2", "{\n" + - " \"node\": {\n" + - " \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" + - " },\n" + - " \"network\": {\n" + - " \"port\":3346,\n" + - " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n" + - " }\n" + - "}"); - }}; + put("node0", "{\n" + + " \"node\": {\n" + + " \"name\":node0,\n" + + " \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" + + " },\n" + + " \"network\": {\n" + + " \"port\":3344,\n" + + " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n" + + " },\n" + + " \"table\": {\n" + + " \"tables\": {\n" + + " \"tbl1\": {\n" + + " \"partitions\":10,\n" + + " \"replicas\":2,\n" + + " \"columns\": { \n" + + " \"key\": {\n" + + " \"type\": {" + + " \"type\":UUID\n" + + " },\n" + + " \"nullable\":false\n" + + " },\n" + + " \"affKey\": {\n" + + " \"type\": {" + + " \"type\":INT64\n" + + " },\n" + + " \"nullable\":false\n" + + " },\n" + + " \"valString\": {\n" + + " \"type\": {" + + " \"type\":String\n" + + " },\n" + + " \"nullable\":false\n" + + " },\n" + + " \"valInt\": {\n" + + " \"type\": {" + + " \"type\":INT32\n" + + " },\n" + + " \"nullable\":false\n" + + " },\n" + + " \"valNullable\": {\n" + + " \"type\": {" + + " \"type\":String\n" + + " },\n" + + " \"nullable\":true\n" + + " }\n" + + " },\n" + /* Columns. */ + " \"indices\": {\n" + + " \"PK\": {\n" + + " \"type\":PRIMARY,\n" + + " \"columns\": {\n" + + " \"key\": {\n" + + " \"asc\":true\n" + + " },\n" + + " \"affKey\": {}\n" + + " },\n" + /* Columns. */ + " \"affinityColumns\":[ \"affKey\" ]\n" + + " }\n" + + " }\n" + /* Indices. */ + " },\n" + /* Table. */ + "\n" + + " \"tbl2\": {\n" + // Table minimal configuration. + " \"columns\": { \n" + + " \"key\": {\n" + + " \"type\": {" + + " \"type\":INT64\n" + + " },\n" + + " },\n" + + " \"val\": {\n" + + " \"type\": {" + + " \"type\":INT64\n" + + " },\n" + + " }\n" + + " },\n" + /* Columns. */ + " \"indices\": {\n" + + " \"PK\": {\n" + + " \"type\":PRIMARY,\n" + + " \"columns\": {\n" + + " \"key\": {}\n" + + " },\n" + /* Columns. */ + " }\n" + + " }\n" + /* Indices. */ + " }\n" + /* Table. */ + " }\n" + /* Tables. */ + " }\n" + /* Root. */ + "}"); + + put("node1", "{\n" + + " \"node\": {\n" + + " \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" + + " },\n" + + " \"network\": {\n" + + " \"port\":3345,\n" + + " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n" + + " }\n" + + "}"); + + put("node2", "{\n" + + " \"node\": {\n" + + " \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" + + " },\n" + + " \"network\": {\n" + + " \"port\":3346,\n" + + " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n" + + " }\n" + + "}"); + }}; /** */ private final List<Ignite> clusterNodes = new ArrayList<>(); diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/NativeTypes.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/NativeTypes.java index 1d5bedb..3fc2143 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/NativeTypes.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/NativeTypes.java @@ -90,6 +90,7 @@ public class NativeTypes { * * @param precision Precision. * @param scale Scale. + * @return Native type. */ public static NativeType decimalOf(int precision, int scale) { return new NumericNativeType(precision, scale); diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java index 42039d1..bc653c2 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java @@ -208,50 +208,12 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters> * registers initial schema from configuration. * * @param tblId Table id. - * @param tblName Table name. - * @return Operation future. - */ - public CompletableFuture<Boolean> updateSchemaForTable(final UUID tblId, String tblName) { - return vaultMgr.get(ByteArray.fromString(INTERNAL_PREFIX + tblId)). - thenCompose(entry -> { - TableConfiguration tblConfig = configurationMgr.configurationRegistry(). - getConfiguration(TablesConfiguration.KEY).tables().get(tblName); - - assert !entry.empty(); - - final int oldVer = (int)ByteUtils.bytesToLong(entry.value(), 0); - final int newVer = oldVer + 1; - - final ByteArray lastVerKey = new ByteArray(INTERNAL_PREFIX + tblId); - final ByteArray schemaKey = new ByteArray(INTERNAL_PREFIX + tblId + INTERNAL_VER_SUFFIX + newVer); - - SchemaTable schemaTable = SchemaConfigurationConverter.convert(tblConfig.value()); - final SchemaDescriptor desc = SchemaDescriptorConverter.convert(tblId, newVer, schemaTable); - - return metaStorageMgr.invoke(Conditions.notExists(schemaKey), - Operations.put(schemaKey, ByteUtils.toBytes(desc)), - Operations.noop()) - //TODO: IGNITE-14679 Serialize schema. - .thenCompose(res -> metaStorageMgr.invoke( - Conditions.value(lastVerKey).eq(ByteUtils.longToBytes(oldVer)), - Operations.put(lastVerKey, ByteUtils.longToBytes(newVer)), - Operations.noop())); - }); - } - - /** - * Creates schema registry for the table with existed schema or - * registers initial schema from configuration. - * - * @param tblId Table id. - * @param tblName Table name. * @param oldTbl Old table configuration. * @param newTbl New table configuraiton. * @return Operation future. */ public CompletableFuture<Boolean> updateSchemaForTable( final UUID tblId, - String tblName, TableView oldTbl, TableView newTbl ) { diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/builder/SchemaTableBuilderImpl.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/builder/SchemaTableBuilderImpl.java index 1da11d9..ae42c64 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/builder/SchemaTableBuilderImpl.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/builder/SchemaTableBuilderImpl.java @@ -121,6 +121,9 @@ public class SchemaTableBuilderImpl implements SchemaTableBuilder { /** * Validate indices. + * + * @param indices Table indices. + * @param columns Table columns. */ public static void validateIndices(Collection<TableIndex> indices, Collection<Column> columns) { Set<String> colNames = columns.stream().map(Column::name).collect(Collectors.toSet()); diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/mapping/ColumnMapping.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/mapping/ColumnMapping.java index 2125e9f..f0fcecb 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/mapping/ColumnMapping.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/mapping/ColumnMapping.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.schema.mapping; import java.io.Serializable; +import org.apache.ignite.internal.schema.SchemaDescriptor; /** * Column mapping helper. @@ -35,12 +36,44 @@ public class ColumnMapping { /** * @param cols Number of columns. + * @return Column mapper builder. */ public static ColumnMapperBuilder mapperBuilder(int cols) { return new ColumnMapperImpl(cols); } /** + * Builds mapper for given schema via merging schema mapper with the provided one. + * Used for builing columns mapper between arbitraty schema versions with bottom->top approach. + * + * @param mapping Column mapper. + * @param schema Target schema. + * @return Merged column mapper. + */ + public static ColumnMapper mergeMapping(ColumnMapper mapping, SchemaDescriptor schema) { + if (mapping == identityMapping()) + return schema.columnMapping(); + + else if (schema.columnMapping() == identityMapping()) + return mapping; + + ColumnMapperBuilder builder = mapperBuilder(schema.length()); + + ColumnMapper schemaMapper = schema.columnMapping(); + + for (int i = 0; i < schema.length(); i++) { + int idx = schemaMapper.map(i); + + if (idx < 0) + builder.add(i, -1); + else + builder.add(i, mapping.map(idx)); + } + + return builder.build(); + } + + /** * Stub. */ private ColumnMapping() { diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java index 10ee084..0dc23cd 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java @@ -17,12 +17,17 @@ package org.apache.ignite.internal.schema.registry; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.function.Function; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.Row; import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.schema.SchemaRegistry; +import org.apache.ignite.internal.schema.mapping.ColumnMapper; +import org.apache.ignite.internal.schema.mapping.ColumnMapping; import org.jetbrains.annotations.Nullable; /** @@ -33,7 +38,10 @@ public class SchemaRegistryImpl implements SchemaRegistry { public static final int INITIAL_SCHEMA_VERSION = -1; /** Cached schemas. */ - private final ConcurrentSkipListMap<Integer, SchemaDescriptor> schemaCache = new ConcurrentSkipListMap<>(); + private final ConcurrentNavigableMap<Integer, SchemaDescriptor> schemaCache = new ConcurrentSkipListMap<>(); + + /** Column mappers cache. */ + private final Map<Long, ColumnMapper> mappingCache = new ConcurrentHashMap<>(); /** Last registered version. */ private volatile int lastVer; @@ -106,9 +114,37 @@ public class SchemaRegistryImpl implements SchemaRegistry { if (curSchema.version() == rowSchema.version()) return new Row(rowSchema, row); - assert rowSchema.version() == curSchema.version() + 1; // TODO: IGNITE-14864 implement merged mapper for arbitraty schema versions. + ColumnMapper mapping = resolveMapping(curSchema, rowSchema); + + return new UpgradingRowAdapter(curSchema, row, mapping); + } + + /** + * @param curSchema Target schema. + * @param rowSchema Row schema. + * @return Column mapper for target schema. + */ + ColumnMapper resolveMapping(SchemaDescriptor curSchema, SchemaDescriptor rowSchema) { + assert curSchema.version() > rowSchema.version(); + + if (curSchema.version() == rowSchema.version() + 1) + return curSchema.columnMapping(); + + final long mappingKey = (((long)curSchema.version()) << 32) | (rowSchema.version()); + + ColumnMapper mapping; - return new UpgradingRowAdapter(curSchema, row, rowSchema.columnMapping()); + if ((mapping = mappingCache.get(mappingKey)) != null) + return mapping; + + mapping = schema(rowSchema.version() + 1).columnMapping(); + + for (int i = rowSchema.version() + 2; i <= curSchema.version(); i++) + mapping = ColumnMapping.mergeMapping(mapping, schema(i)); + + mappingCache.putIfAbsent(mappingKey, mapping); + + return mapping; } /** @@ -145,6 +181,16 @@ public class SchemaRegistryImpl implements SchemaRegistry { if (ver >= lastVer || ver <= 0 || schemaCache.keySet().first() < ver) throw new SchemaRegistryException("Incorrect schema version to clean up to: " + ver); - schemaCache.remove(ver); + if (schemaCache.remove(ver) != null) + mappingCache.keySet().removeIf(k -> (k & 0xFFFF_FFFFL) == ver); + } + + /** + * For test purposes only. + * + * @return ColumnMapping cache. + */ + Map<Long, ColumnMapper> mappingCache() { + return mappingCache; } } diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java index 42cf7e6..7fd0167 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java @@ -29,7 +29,7 @@ import org.apache.ignite.internal.schema.SchemaDescriptor; */ class UpgradingRowAdapter extends Row { /** Column mapper. */ - private final ColumnMapper mapping; + private final ColumnMapper mapper; /** * @param schema Schema descriptor of new version. @@ -39,12 +39,12 @@ class UpgradingRowAdapter extends Row { UpgradingRowAdapter(SchemaDescriptor schema, BinaryRow row, ColumnMapper mapper) { super(schema, row); - this.mapping = mapper; + this.mapper = mapper; } /** {@inheritDoc} */ @Override protected long findColumn(int colIdx, NativeTypeSpec type) throws InvalidTypeException { - int mapIdx = mapping.map(colIdx); + int mapIdx = mapper.map(colIdx); return (mapIdx < 0) ? Long.MIN_VALUE : super.findColumn(mapIdx, type); } diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaRegistryImplTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java similarity index 90% rename from modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaRegistryImplTest.java rename to modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java index 9750aff..8d34981 100644 --- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaRegistryImplTest.java +++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java @@ -15,15 +15,16 @@ * limitations under the License. */ -package org.apache.ignite.internal.schema; +package org.apache.ignite.internal.schema.registry; import java.util.Arrays; import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; -import org.apache.ignite.internal.schema.registry.SchemaRegistrationConflictException; -import org.apache.ignite.internal.schema.registry.SchemaRegistryException; -import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl; +import org.apache.ignite.internal.schema.Column; +import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.schema.SchemaManager; +import org.apache.ignite.internal.schema.mapping.ColumnMapper; import org.junit.jupiter.api.Test; import static org.apache.ignite.internal.schema.NativeTypes.BYTES; @@ -522,6 +523,63 @@ public class SchemaRegistryImplTest { } /** + * Check schema cache cleanup. + */ + @Test + public void testSchemaCacheCleanup() { + UUID tableId = UUID.randomUUID(); + + final SchemaDescriptor schemaV1 = new SchemaDescriptor(tableId, 1, + new Column[]{new Column("keyLongCol", INT64, true)}, + new Column[]{new Column("valBytesCol", BYTES, true)}); + + final SchemaDescriptor schemaV2 = new SchemaDescriptor(tableId, 2, + new Column[]{new Column("keyLongCol", INT64, true)}, + new Column[]{ + new Column("valBytesCol", BYTES, true), + new Column("valStringCol", STRING, true) + }); + + final SchemaDescriptor schemaV3 = new SchemaDescriptor(tableId, 3, + new Column[]{new Column("keyLongCol", INT64, true)}, + new Column[]{ + new Column("valStringCol", STRING, true) + }); + + final SchemaDescriptor schemaV4 = new SchemaDescriptor(tableId, 4, + new Column[]{new Column("keyLongCol", INT64, true)}, + new Column[]{ + new Column("valBytesCol", BYTES, true), + new Column("valStringCol", STRING, true) + }); + + final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null); + + Map<Long, ColumnMapper> cache = reg.mappingCache(); + + reg.onSchemaRegistered(schemaV1); + reg.onSchemaRegistered(schemaV2); + reg.onSchemaRegistered(schemaV3); + reg.onSchemaRegistered(schemaV4); + + assertEquals(0, cache.size()); + + reg.resolveMapping(schemaV4, schemaV1); + reg.resolveMapping(schemaV3, schemaV1); + reg.resolveMapping(schemaV4, schemaV2); + + assertEquals(3, cache.size()); + + reg.onSchemaDropped(schemaV1.version()); + + assertEquals(1, cache.size()); + + reg.onSchemaDropped(schemaV2.version()); + + assertEquals(0, cache.size()); + } + + /** * @param history Table schema history. * @return Schema history map. */ diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 939fd07..116d4ea 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -437,7 +437,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp final int ver = tbl.schemaView().lastSchemaVersion() + 1; if (hasMetastorageLocally) - futs.add(schemaMgr.updateSchemaForTable(tblId, tblName, oldCfg.get(tblName), newCfg.get(tblName))); + futs.add(schemaMgr.updateSchemaForTable(tblId, oldCfg.get(tblName), newCfg.get(tblName))); final CompletableFuture<SchemaEventParameters> schemaReadyFut = new CompletableFuture<>();