This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 7f82358 IGNITE-14863: Schema evolution. Add and remove column. (#173) 7f82358 is described below commit 7f82358fa7aeead945991e7e5fcdd9b9d8ffab44 Author: Andrew V. Mashenkov <amashen...@users.noreply.github.com> AuthorDate: Mon Jul 5 17:58:10 2021 +0300 IGNITE-14863: Schema evolution. Add and remove column. (#173) --- .../apache/ignite/table/manager/IgniteTables.java | 9 +- .../configuration/processor/Processor.java | 4 +- .../runner/app/AbstractSchemaChangeTest.java | 158 ++++++++++ .../runner/app/DynamicTableCreationTest.java | 18 +- .../runner/app/SchemaChangeKVViewTest.java | 119 ++++++++ .../runner/app/SchemaChangeTableViewTest.java | 116 +++++++ .../org/apache/ignite/internal/schema/Column.java | 16 +- .../org/apache/ignite/internal/schema/Row.java | 83 +++-- .../ignite/internal/schema/SchemaManager.java | 51 +++- .../ignite/internal/schema/SchemaRegistry.java | 23 +- .../configuration/SchemaDescriptorConverter.java | 38 ++- .../ignite/internal/schema/event/SchemaEvent.java | 3 + .../internal/schema/registry/ColumnMapping.java | 64 ++++ .../schema/registry/SchemaRegistryImpl.java | 73 +++-- .../schema/registry/UpgradingRowAdapter.java | 53 ++++ .../ignite/distributed/ITDistributedTableTest.java | 9 + .../ignite/internal/table/KVBinaryViewImpl.java | 5 +- .../apache/ignite/internal/table/TableImpl.java | 5 +- .../org/apache/ignite/internal/table/TableRow.java | 14 +- .../internal/table/distributed/TableManager.java | 336 +++++++++++++++------ .../ignite/internal/table/event/TableEvent.java | 3 + .../table/TableBinaryViewOperationsTest.java | 44 +-- .../table/impl/DummySchemaManagerImpl.java | 14 + 23 files changed, 1054 insertions(+), 204 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java b/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java index 780e9f1..aab5f3e 100644 --- a/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java +++ b/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java @@ -31,7 +31,6 @@ import org.apache.ignite.table.Table; public interface IgniteTables { /** * Creates a cluster table. - * The table changes if already exists. * * @param name Table name. * @param tableInitChange Table changer. @@ -40,6 +39,14 @@ public interface IgniteTables { Table createTable(String name, Consumer<TableChange> tableInitChange); /** + * Alter a cluster table. + * + * @param name Table name. + * @param tableChange Table changer. + */ + void alterTable(String name, Consumer<TableChange> tableChange); + + /** * Drops a table with the name specified. * * @param name Table name. diff --git a/modules/configuration-annotation-processor/src/main/java/org/apache/ignite/internal/configuration/processor/Processor.java b/modules/configuration-annotation-processor/src/main/java/org/apache/ignite/internal/configuration/processor/Processor.java index d0ef7e3..43ab153 100644 --- a/modules/configuration-annotation-processor/src/main/java/org/apache/ignite/internal/configuration/processor/Processor.java +++ b/modules/configuration-annotation-processor/src/main/java/org/apache/ignite/internal/configuration/processor/Processor.java @@ -300,6 +300,7 @@ public class Processor extends AbstractProcessor { .addModifiers(PUBLIC); TypeSpec.Builder changeClsBuilder = TypeSpec.interfaceBuilder(changeClsName) + .addSuperinterface(viewClsName) .addModifiers(PUBLIC); ClassName consumerClsName = ClassName.get(Consumer.class); @@ -337,9 +338,6 @@ public class Processor extends AbstractProcessor { .returns(viewFieldType); viewClsBuilder.addMethod(getMtdBuilder.build()); - - if (valAnnotation != null) - changeClsBuilder.addMethod(getMtdBuilder.build()); } { diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java new file mode 100644 index 0000000..c2c560a --- /dev/null +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.runner.app; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.ignite.app.Ignite; +import org.apache.ignite.app.IgnitionManager; +import org.apache.ignite.internal.app.IgnitionCleaner; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.schema.Column; +import org.apache.ignite.schema.ColumnType; +import org.apache.ignite.schema.SchemaBuilders; +import org.apache.ignite.schema.SchemaTable; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; + +import static org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter.convert; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Ignition interface tests. + */ +abstract class AbstractSchemaChangeTest { + /** Table name. */ + public static final String TABLE = "PUBLIC.tbl1"; + + /** Nodes bootstrap configuration. */ + private final Map<String, String> nodesBootstrapCfg = new LinkedHashMap<>() {{ + put("node0", "{\n" + + " \"node\": {\n" + + " \"metastorageNodes\":[ \"node0\" ]\n" + + " },\n" + + " \"network\": {\n" + + " \"port\":3344,\n" + + " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n" + + " }\n" + + "}"); + + put("node1", "{\n" + + " \"node\": {\n" + + " \"metastorageNodes\":[ \"node0\" ]\n" + + " },\n" + + " \"network\": {\n" + + " \"port\":3345,\n" + + " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n" + + " }\n" + + "}"); + + put("node2", "{\n" + + " \"node\": {\n" + + " \"metastorageNodes\":[ \"node0\" ]\n" + + " },\n" + + " \"network\": {\n" + + " \"port\":3346,\n" + + " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n" + + " }\n" + + "}"); + }}; + + /** Cluster nodes. */ + private final List<Ignite> clusterNodes = new ArrayList<>(); + + /** + * + */ + @BeforeAll + static void beforeAll() throws Exception { + IgnitionCleaner.removeAllData(); + } + + /** + * + */ + @AfterEach + void afterEach() throws Exception { + IgniteUtils.closeAll(clusterNodes); + + IgnitionCleaner.removeAllData(); + } + + /** + * @return Grid nodes. + */ + @NotNull protected List<Ignite> startGrid() { + List<Ignite> clusterNodes = new ArrayList<>(); + + for (Map.Entry<String, String> nodeBootstrapCfg : nodesBootstrapCfg.entrySet()) + clusterNodes.add(IgnitionManager.start(nodeBootstrapCfg.getKey(), nodeBootstrapCfg.getValue())); + + assertEquals(3, clusterNodes.size()); + return clusterNodes; + } + + /** + * @param nodes Cluster nodes. + */ + @NotNull protected void createTable(List<Ignite> nodes) { + // Create table on node 0. + SchemaTable schTbl1 = SchemaBuilders.tableBuilder("PUBLIC", "tbl1").columns( + SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(), + SchemaBuilders.column("valInt", ColumnType.INT32).asNullable().build(), + SchemaBuilders.column("valStr", ColumnType.string()).withDefaultValue("default").build() + ).withPrimaryKey("key").build(); + + nodes.get(0).tables().createTable( + schTbl1.canonicalName(), + tblCh -> convert(schTbl1, tblCh).changeReplicas(1).changePartitions(10) + ); + } + + /** + * @param nodes Cluster nodes. + * @param columnToAdd Column to add. + */ + protected void addColumn(List<Ignite> nodes, Column columnToAdd) { + nodes.get(0).tables().alterTable(TABLE, + chng -> chng.changeColumns(cols -> { + final int colIdx = chng.columns().size(); + //TODO: avoid 'colIdx' or replace with correct last colIdx. + cols.create(String.valueOf(colIdx), colChg -> convert(columnToAdd, colChg)); + })); + } + + /** + * @param nodes Cluster nodes. + * @param colName Name of column to drop. + */ + protected void dropColumn(List<Ignite> nodes, String colName) { + nodes.get(0).tables().alterTable(TABLE, + chng -> chng.changeColumns(cols -> { + cols.delete(chng.columns().namedListKeys().stream() + .filter(key -> colName.equals(chng.columns().get(key).name())) + .findAny() + .orElseThrow(() -> { + throw new IllegalStateException("Column not found."); + })); + })); + } +} diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java index 6ca33a2..14f9122 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java @@ -27,6 +27,7 @@ import org.apache.ignite.app.IgnitionManager; import org.apache.ignite.internal.app.IgnitionCleaner; import org.apache.ignite.internal.schema.SchemaManager; import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter; +import org.apache.ignite.internal.table.SchemaMismatchException; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.lang.IgniteLogger; import org.apache.ignite.schema.ColumnType; @@ -131,8 +132,8 @@ class DynamicTableCreationTest { final Tuple keyTuple1 = tbl2.tupleBuilder().set("key", 1L).build(); final Tuple keyTuple2 = kvView2.tupleBuilder().set("key", 2L).build(); - assertThrows(IllegalArgumentException.class, () -> kvView2.get(keyTuple1).value("key")); - assertThrows(IllegalArgumentException.class, () -> kvView2.get(keyTuple1).value("key")); + assertThrows(SchemaMismatchException.class, () -> kvView2.get(keyTuple1).value("key")); + assertThrows(SchemaMismatchException.class, () -> kvView2.get(keyTuple1).value("key")); assertEquals(1, (Long)tbl2.get(keyTuple1).value("key")); assertEquals(2, (Long)tbl2.get(keyTuple2).value("key")); @@ -140,6 +141,11 @@ class DynamicTableCreationTest { assertEquals(111, (Integer)kvView2.get(keyTuple1).value("val")); assertEquals(222, (Integer)tbl2.get(keyTuple2).value("val")); assertEquals(222, (Integer)kvView2.get(keyTuple2).value("val")); + + assertThrows(SchemaMismatchException.class, () -> tbl1.get(keyTuple1).value("key")); + assertThrows(SchemaMismatchException.class, () -> kvView1.get(keyTuple1).value("key")); + assertThrows(SchemaMismatchException.class, () -> tbl1.get(keyTuple1).value("val")); + assertThrows(SchemaMismatchException.class, () -> kvView1.get(keyTuple1).value("val")); } /** @@ -193,10 +199,10 @@ class DynamicTableCreationTest { final Tuple keyTuple2 = tbl2.tupleBuilder().set("key", uuid2).set("affKey", 4242L).build(); // KV view must NOT return key columns in value. - assertThrows(IllegalArgumentException.class, () -> kvView2.get(keyTuple1).value("key")); - assertThrows(IllegalArgumentException.class, () -> kvView2.get(keyTuple1).value("affKey")); - assertThrows(IllegalArgumentException.class, () -> kvView2.get(keyTuple2).value("key")); - assertThrows(IllegalArgumentException.class, () -> kvView2.get(keyTuple2).value("affKey")); + assertThrows(SchemaMismatchException.class, () -> kvView2.get(keyTuple1).value("key")); + assertThrows(SchemaMismatchException.class, () -> kvView2.get(keyTuple1).value("affKey")); + assertThrows(SchemaMismatchException.class, () -> kvView2.get(keyTuple2).value("key")); + assertThrows(SchemaMismatchException.class, () -> kvView2.get(keyTuple2).value("affKey")); // Record binary view MUST return key columns in value. assertEquals(uuid, tbl2.get(keyTuple1).value("key")); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeKVViewTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeKVViewTest.java new file mode 100644 index 0000000..e0c2101 --- /dev/null +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeKVViewTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.runner.app; + +import java.util.List; +import org.apache.ignite.app.Ignite; +import org.apache.ignite.internal.table.ColumnNotFoundException; +import org.apache.ignite.schema.ColumnType; +import org.apache.ignite.schema.SchemaBuilders; +import org.apache.ignite.table.KeyValueBinaryView; +import org.apache.ignite.table.Tuple; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Ignition interface tests. + */ +@Disabled("https://issues.apache.org/jira/browse/IGNITE-14581") +class SchemaChangeKVViewTest extends AbstractSchemaChangeTest { + /** + * Check add a new column to table schema. + */ + @Test + public void testDropColumn() { + List<Ignite> grid = startGrid(); + + createTable(grid); + + KeyValueBinaryView kvView = grid.get(1).tables().table(TABLE).kvView(); + + { + kvView.put(kvView.tupleBuilder().set("key", 1L).build(), + kvView.tupleBuilder().set("valInt", 111).set("valStr", "str").build()); + } + + dropColumn(grid, "valStr"); + + { + // Check old row conversion. + final Tuple keyTuple = kvView.tupleBuilder().set("key", 1L).build(); + + assertEquals(111, (Integer)kvView.get(keyTuple).value("valInt")); + assertThrows(ColumnNotFoundException.class, () -> kvView.get(keyTuple).value("valStr")); + + // Check tuple of outdated schema. + assertThrows(ColumnNotFoundException.class, () -> kvView.put( + kvView.tupleBuilder().set("key", 2L).build(), + kvView.tupleBuilder().set("valInt", -222).set("valStr", "str").build()) + ); + + // Check tuple of correct schema. + kvView.put(kvView.tupleBuilder().set("key", 2L).build(), kvView.tupleBuilder().set("valInt", 222).build()); + + final Tuple keyTuple2 = kvView.tupleBuilder().set("key", 2L).build(); + + assertEquals(222, (Integer)kvView.get(keyTuple2).value("valInt")); + assertThrows(ColumnNotFoundException.class, () -> kvView.get(keyTuple2).value("valStr")); + } + } + + + /** + * Check drop column from table schema. + */ + @Test + public void testAddNewColumn() { + List<Ignite> grid = startGrid(); + + createTable(grid); + + KeyValueBinaryView kvView = grid.get(1).tables().table(TABLE).kvView(); + + { + kvView.put(kvView.tupleBuilder().set("key", 1L).build(), kvView.tupleBuilder().set("valInt", 111).build()); + + assertThrows(ColumnNotFoundException.class, () -> kvView.put( + kvView.tupleBuilder().set("key", 1L).build(), + kvView.tupleBuilder().set("valInt", -111).set("valStrNew", "str").build()) + ); + } + + addColumn(grid, SchemaBuilders.column("valStrNew", ColumnType.string()).asNullable().withDefaultValue("default").build()); + + { + // Check old row conversion. + Tuple keyTuple = kvView.tupleBuilder().set("key", 1L).build(); + + assertEquals(111, (Integer)kvView.get(keyTuple).value("valInt")); + assertEquals("default", kvView.get(keyTuple).value("valStrNew")); + + // Check tuple of new schema. + kvView.put(kvView.tupleBuilder().set("key", 2L).build(), + kvView.tupleBuilder().set("valInt", 222).set("valStrNew", "str").build()); + + Tuple keyTuple2 = kvView.tupleBuilder().set("key", 2L).build(); + + assertEquals(222, (Integer)kvView.get(keyTuple2).value("valInt")); + assertEquals("str", kvView.get(keyTuple2).value("valStrNew")); + } + } +} diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeTableViewTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeTableViewTest.java new file mode 100644 index 0000000..d68ea3f --- /dev/null +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeTableViewTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.runner.app; + +import java.util.List; +import org.apache.ignite.app.Ignite; +import org.apache.ignite.internal.table.ColumnNotFoundException; +import org.apache.ignite.schema.ColumnType; +import org.apache.ignite.schema.SchemaBuilders; +import org.apache.ignite.table.Table; +import org.apache.ignite.table.Tuple; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Ignition interface tests. + */ +@Disabled("https://issues.apache.org/jira/browse/IGNITE-14581") +class SchemaChangeTableViewTest extends AbstractSchemaChangeTest { + /** + * Check add a new column to table schema. + */ + @Test + public void testDropColumn() { + List<Ignite> grid = startGrid(); + + createTable(grid); + + final Table tbl = grid.get(1).tables().table(TABLE); + + { + tbl.insert(tbl.tupleBuilder().set("key", 1L).set("valInt", 111).set("valStr", "str").build()); + } + + dropColumn(grid, "valStr"); + + { + // Check old row conversion. + final Tuple keyTuple = tbl.tupleBuilder().set("key", 1L).build(); + + assertEquals(1, (Long)tbl.get(keyTuple).value("key")); + assertEquals(111, (Integer)tbl.get(keyTuple).value("valInt")); + assertThrows(ColumnNotFoundException.class, () -> tbl.get(keyTuple).value("valStr")); + + // Check tuple of outdated schema. + assertThrows(ColumnNotFoundException.class, + () -> tbl.insert(tbl.tupleBuilder().set("key", 2L).set("valInt", -222).set("valStr", "str").build()) + ); + + // Check tuple of correct schema. + tbl.insert(tbl.tupleBuilder().set("key", 2L).set("valInt", 222).build()); + + final Tuple keyTuple2 = tbl.tupleBuilder().set("key", 2L).build(); + + assertEquals(2, (Long)tbl.get(keyTuple2).value("key")); + assertEquals(222, (Integer)tbl.get(keyTuple2).value("valInt")); + assertThrows(ColumnNotFoundException.class, () -> tbl.get(keyTuple2).value("valStr")); + } + } + + /** + * Check drop column from table schema. + */ + @Test + public void testAddNewColumn() { + List<Ignite> grid = startGrid(); + + createTable(grid); + + Table tbl = grid.get(1).tables().table(TABLE); + + { + tbl.insert(tbl.tupleBuilder().set("key", 1L).set("valInt", 111).build()); + + assertThrows(ColumnNotFoundException.class, + () -> tbl.insert(tbl.tupleBuilder().set("key", 1L).set("valInt", -111).set("valStrNew", "str").build()) + ); + } + + addColumn(grid, SchemaBuilders.column("valStrNew", ColumnType.string()).asNullable().withDefaultValue("default").build()); + + // Check old row conversion. + Tuple keyTuple1 = tbl.tupleBuilder().set("key", 1L).build(); + + assertEquals(1, (Long)tbl.get(keyTuple1).value("key")); + assertEquals(111, (Integer)tbl.get(keyTuple1).value("valInt")); + assertEquals("default", tbl.get(keyTuple1).value("valStrNew")); + + // Check tuple of new schema. + tbl.insert(tbl.tupleBuilder().set("key", 2L).set("valInt", 222).set("valStrNew", "str").build()); + + Tuple keyTuple2 = tbl.tupleBuilder().set("key", 2L).build(); + + assertEquals(2, (Long)tbl.get(keyTuple2).value("key")); + assertEquals(222, (Integer)tbl.get(keyTuple2).value("valInt")); + assertEquals("str", tbl.get(keyTuple2).value("valStrNew")); + } +} diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Column.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/Column.java index 815f469..72f778d 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Column.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/Column.java @@ -17,10 +17,10 @@ package org.apache.ignite.internal.schema; +import org.apache.ignite.internal.tostring.S; +import org.jetbrains.annotations.Nullable; import java.io.Serializable; import java.util.function.Supplier; -import org.apache.ignite.internal.tostring.S; -import org.jetbrains.annotations.NotNull; /** * Column description for a type schema. Column contains a column name, a column type and a nullability flag. @@ -75,7 +75,7 @@ public class Column implements Comparable<Column>, Serializable { String name, NativeType type, boolean nullable, - @NotNull Supplier<Object> defValSup + @Nullable Supplier<Object> defValSup ) { this(-1, name, type, nullable, defValSup); } @@ -87,12 +87,12 @@ public class Column implements Comparable<Column>, Serializable { * @param nullable If {@code false}, null values will not be allowed for this column. * @param defValSup Default value supplier. */ - Column( + private Column( int schemaIndex, String name, NativeType type, boolean nullable, - @NotNull Supplier<Object> defValSup + @Nullable Supplier<Object> defValSup ) { this.schemaIndex = schemaIndex; this.name = name; @@ -137,9 +137,10 @@ public class Column implements Comparable<Column>, Serializable { public Object defaultValue() { Object val = defValSup.get(); - assert nullable || val != null : "Null value is not accepted for not nullable column: [col=" + this + ']'; + if (nullable || val != null) + return val; - return val; + throw new IllegalStateException("Null value is not accepted for not nullable column: [col=" + this + ']'); } /** {@inheritDoc} */ @@ -173,6 +174,7 @@ public class Column implements Comparable<Column>, Serializable { /** * Validate the object by column's constraint. + * * @param val Object to validate. */ public void validate(Object val) { diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Row.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/Row.java index 973e8b2..82aab99 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Row.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/Row.java @@ -33,7 +33,7 @@ import java.util.UUID; */ public class Row implements BinaryRow { /** Schema descriptor. */ - private final SchemaDescriptor schema; + protected final SchemaDescriptor schema; /** Binary row. */ private final BinaryRow row; @@ -53,8 +53,6 @@ public class Row implements BinaryRow { * @param row Binary row representation. */ public Row(SchemaDescriptor schema, BinaryRow row) { - assert row.schemaVersion() == schema.version(); - this.row = row; this.schema = schema; } @@ -83,7 +81,10 @@ public class Row implements BinaryRow { public byte byteValue(int col) throws InvalidTypeException { long off = findColumn(col, NativeTypeSpec.BYTE); - return off < 0 ? 0 : readByte(offset(off)); + if (off < 0) + return off == -1 ? 0 : (byte)schema.column(col).defaultValue(); + + return readByte(offset(off)); } /** @@ -96,7 +97,10 @@ public class Row implements BinaryRow { public Byte byteValueBoxed(int col) throws InvalidTypeException { long off = findColumn(col, NativeTypeSpec.BYTE); - return off < 0 ? null : readByte(offset(off)); + if (off < 0) + return off == -1 ? null : (Byte)schema.column(col).defaultValue(); + + return readByte(offset(off)); } /** @@ -109,7 +113,10 @@ public class Row implements BinaryRow { public short shortValue(int col) throws InvalidTypeException { long off = findColumn(col, NativeTypeSpec.SHORT); - return off < 0 ? 0 : readShort(offset(off)); + if (off < 0) + return off == -1 ? 0 : (short)schema.column(col).defaultValue(); + + return readShort(offset(off)); } /** @@ -122,7 +129,10 @@ public class Row implements BinaryRow { public Short shortValueBoxed(int col) throws InvalidTypeException { long off = findColumn(col, NativeTypeSpec.SHORT); - return off < 0 ? null : readShort(offset(off)); + if (off < 0) + return off == -1 ? null : (Short)schema.column(col).defaultValue(); + + return readShort(offset(off)); } /** @@ -135,7 +145,10 @@ public class Row implements BinaryRow { public int intValue(int col) throws InvalidTypeException { long off = findColumn(col, NativeTypeSpec.INTEGER); - return off < 0 ? 0 : readInteger(offset(off)); + if (off < 0) + return off == -1 ? 0 : (int)schema.column(col).defaultValue(); + + return readInteger(offset(off)); } /** @@ -148,7 +161,10 @@ public class Row implements BinaryRow { public Integer intValueBoxed(int col) throws InvalidTypeException { long off = findColumn(col, NativeTypeSpec.INTEGER); - return off < 0 ? null : readInteger(offset(off)); + if (off < 0) + return off == -1 ? null : (Integer)schema.column(col).defaultValue(); + + return readInteger(offset(off)); } /** @@ -161,7 +177,10 @@ public class Row implements BinaryRow { public long longValue(int col) throws InvalidTypeException { long off = findColumn(col, NativeTypeSpec.LONG); - return off < 0 ? 0 : readLong(offset(off)); + if (off < 0) + return off == -1 ? 0L : (long)schema.column(col).defaultValue(); + + return readLong(offset(off)); } /** @@ -174,7 +193,10 @@ public class Row implements BinaryRow { public Long longValueBoxed(int col) throws InvalidTypeException { long off = findColumn(col, NativeTypeSpec.LONG); - return off < 0 ? null : readLong(offset(off)); + if (off < 0) + return off == -1 ? null : (Long)schema.column(col).defaultValue(); + + return readLong(offset(off)); } /** @@ -187,7 +209,10 @@ public class Row implements BinaryRow { public float floatValue(int col) throws InvalidTypeException { long off = findColumn(col, NativeTypeSpec.FLOAT); - return off < 0 ? 0.f : readFloat(offset(off)); + if (off < 0) + return off == -1 ? 0.f : (float)schema.column(col).defaultValue(); + + return readFloat(offset(off)); } /** @@ -200,7 +225,10 @@ public class Row implements BinaryRow { public Float floatValueBoxed(int col) throws InvalidTypeException { long off = findColumn(col, NativeTypeSpec.FLOAT); - return off < 0 ? null : readFloat(offset(off)); + if (off < 0) + return off == -1 ? null : (Float)schema.column(col).defaultValue(); + + return readFloat(offset(off)); } /** @@ -213,7 +241,10 @@ public class Row implements BinaryRow { public double doubleValue(int col) throws InvalidTypeException { long off = findColumn(col, NativeTypeSpec.DOUBLE); - return off < 0 ? 0.d : readDouble(offset(off)); + if (off < 0) + return off == -1 ? 0.d : (double)schema.column(col).defaultValue(); + + return readDouble(offset(off)); } /** @@ -226,7 +257,10 @@ public class Row implements BinaryRow { public Double doubleValueBoxed(int col) throws InvalidTypeException { long off = findColumn(col, NativeTypeSpec.DOUBLE); - return off < 0 ? null : readDouble(offset(off)); + if (off < 0) + return off == -1 ? null : (Double)schema.column(col).defaultValue(); + + return readDouble(offset(off)); } /** @@ -252,7 +286,7 @@ public class Row implements BinaryRow { long offLen = findColumn(col, NativeTypeSpec.STRING); if (offLen < 0) - return null; + return offLen == -1 ? null : (String)rowSchema().column(col).defaultValue(); int off = offset(offLen); int len = length(offLen); @@ -271,7 +305,7 @@ public class Row implements BinaryRow { long offLen = findColumn(col, NativeTypeSpec.BYTES); if (offLen < 0) - return null; + return offLen == -1 ? null : (byte[])schema.column(col).defaultValue(); int off = offset(offLen); int len = length(offLen); @@ -287,12 +321,12 @@ public class Row implements BinaryRow { * @throws InvalidTypeException If actual column type does not match the requested column type. */ public UUID uuidValue(int col) throws InvalidTypeException { - long found = findColumn(col, NativeTypeSpec.UUID); + long offLen = findColumn(col, NativeTypeSpec.UUID); - if (found < 0) - return null; + if (offLen < 0) + return offLen == -1 ? null : (UUID)schema.column(col).defaultValue(); - int off = offset(found); + int off = offset(offLen); long lsb = readLong(off); long msb = readLong(off + 8); @@ -311,7 +345,7 @@ public class Row implements BinaryRow { long offLen = findColumn(col, NativeTypeSpec.BITMASK); if (offLen < 0) - return null; + return offLen == -1 ? null : (BitSet)schema.column(col).defaultValue(); int off = offset(offLen); int len = columnLength(col); @@ -440,7 +474,8 @@ public class Row implements BinaryRow { * @param hasNullMap Has null map flag. * @return Encoded offset (from the row start) and length of the column with the given index. */ - private long varlenColumnOffsetAndLength(Columns cols, int baseOff, int idx, boolean hasVarTbl, boolean hasNullMap) { + private long varlenColumnOffsetAndLength(Columns cols, int baseOff, int idx, boolean hasVarTbl, + boolean hasNullMap) { int vartableOff = baseOff + CHUNK_LEN_FIELD_SIZE; int numNullsBefore = 0; @@ -560,7 +595,7 @@ public class Row implements BinaryRow { /** {@inheritDoc} */ @Override public int schemaVersion() { - return row.schemaVersion(); + return schema.version(); } /** {@inheritDoc} */ diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java index efc101b..62a35cb 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java @@ -53,7 +53,7 @@ import org.jetbrains.annotations.NotNull; /** * Schema Manager. - * + * <p> * Schemas MUST be registered in a version ascending order incrementing by {@code 1} with NO gaps, * otherwise an exception will be thrown. The version numbering starts from the {@code 1}. * <p> @@ -113,7 +113,7 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters> if (verPos == -1) { final UUID tblId = UUID.fromString(keyTail); - SchemaRegistry reg = schemaRegistryForTable(tblId); + SchemaRegistryImpl reg = schemaRegistryForTable(tblId); assert reg != null : "Table schema was not initialized or table has been dropped: " + tblId; @@ -124,6 +124,13 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters> onEvent(SchemaEvent.DROPPED, new SchemaEventParameters(tblId, null), null); } + else { + int v = (int)ByteUtils.bytesToLong(evt.newEntry().value(),0); + + assert reg.lastSchemaVersion() == v; + + onEvent(SchemaEvent.CHANGED, new SchemaEventParameters(tblId, reg), null); + } return true; // Ignore last table schema version. } @@ -191,6 +198,42 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters> } /** + * Creates schema registry for the table with existed schema or + * registers initial schema from configuration. + * + * @param tblId Table id. + * @param tblName Table name. + * @return Operation future. + */ + public CompletableFuture<Boolean> updateSchemaForTable(final UUID tblId, String tblName) { + return vaultMgr.get(ByteArray.fromString(INTERNAL_PREFIX + tblId)). + thenCompose(entry -> { + TableConfiguration tblConfig = configurationMgr.configurationRegistry(). + getConfiguration(TablesConfiguration.KEY).tables().get(tblName); + + assert !entry.empty(); + + final int oldVer = (int)ByteUtils.bytesToLong(entry.value(), 0); + final int newVer = oldVer + 1; + + final ByteArray lastVerKey = new ByteArray(INTERNAL_PREFIX + tblId); + final ByteArray schemaKey = new ByteArray(INTERNAL_PREFIX + tblId + INTERNAL_VER_SUFFIX + newVer); + + SchemaTable schemaTable = SchemaConfigurationConverter.convert(tblConfig); + final SchemaDescriptor desc = SchemaDescriptorConverter.convert(tblId, newVer, schemaTable); + + return metaStorageMgr.invoke(Conditions.notExists(schemaKey), + Operations.put(schemaKey, ByteUtils.toBytes(desc)), + Operations.noop()) + //TODO: IGNITE-14679 Serialize schema. + .thenCompose(res -> metaStorageMgr.invoke( + Conditions.value(lastVerKey).eq(ByteUtils.longToBytes(oldVer)), + Operations.put(lastVerKey, ByteUtils.longToBytes(newVer)), + Operations.noop())); + }); + } + + /** * Return table schema of certain version from history. * * @param tblId Table id. @@ -231,8 +274,8 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters> * @param tableId Table id. * @return Schema registry for the table. */ - private SchemaRegistry schemaRegistryForTable(UUID tableId) { - final SchemaRegistry reg = schemaRegs.get(tableId); + private SchemaRegistryImpl schemaRegistryForTable(UUID tableId) { + final SchemaRegistryImpl reg = schemaRegs.get(tableId); if (reg == null) throw new SchemaRegistryException("No schema was ever registered for the table: " + tableId); diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java index 777adf1..ea1ad4e 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java @@ -25,14 +25,29 @@ import org.jetbrains.annotations.NotNull; */ public interface SchemaRegistry { /** - * @return Current schema. + * Gets schema descriptor for the latest version if initialized. + * + * @return Schema descriptor if initialized, {@code null} otherwise. */ SchemaDescriptor schema(); /** - * @param ver Schema version. - * @return Schema of given version. - * @throws SchemaRegistryException If schema was not found. + * @return Last registereg schema version. + */ + public int lastSchemaVersion(); + + /** + * Gets schema descriptor for given version. + * + * @param ver Schema version to get descriptor for. + * @return Schema descriptor of given version. + * @throws SchemaRegistryException If no schema found for given version. */ @NotNull SchemaDescriptor schema(int ver) throws SchemaRegistryException; + + /** + * @param row Binary row. + * @return Schema-aware row. + */ + Row resolve(BinaryRow row); } diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaDescriptorConverter.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaDescriptorConverter.java index 29f288f..400d0c3 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaDescriptorConverter.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaDescriptorConverter.java @@ -17,19 +17,19 @@ package org.apache.ignite.internal.schema.configuration; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.function.Supplier; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.InvalidTypeException; import org.apache.ignite.internal.schema.NativeType; import org.apache.ignite.internal.schema.NativeTypes; import org.apache.ignite.internal.schema.SchemaDescriptor; - import org.apache.ignite.schema.ColumnType; import org.apache.ignite.schema.SchemaTable; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - import static org.apache.ignite.internal.schema.NativeTypes.BYTE; import static org.apache.ignite.internal.schema.NativeTypes.DOUBLE; import static org.apache.ignite.internal.schema.NativeTypes.FLOAT; @@ -86,7 +86,7 @@ public class SchemaDescriptorConverter { return UUID; case BITMASK: - return NativeTypes.bitmaskOf(((ColumnType.VarLenColumnType) colType).length()); + return NativeTypes.bitmaskOf(((ColumnType.VarLenColumnType)colType).length()); case STRING: int strLen = ((ColumnType.VarLenColumnType)colType).length(); @@ -116,7 +116,7 @@ public class SchemaDescriptorConverter { * @return Internal Column. */ private static Column convert(org.apache.ignite.schema.Column colCfg) { - return new Column(colCfg.name(), convert(colCfg.type()), colCfg.nullable()); + return new Column(colCfg.name(), convert(colCfg.type()), colCfg.nullable(), new ConstantSupplier((Serializable)colCfg.defaultValue())); } /** @@ -132,7 +132,7 @@ public class SchemaDescriptorConverter { Column[] keyCols = new Column[keyColsCfg.size()]; - for (int i = 0;i < keyCols.length;i++) + for (int i = 0; i < keyCols.length; i++) keyCols[i] = convert(keyColsCfg.get(i)); String[] affCols = tblCfg.affinityColumns().stream().map(org.apache.ignite.schema.Column::name) @@ -142,9 +142,29 @@ public class SchemaDescriptorConverter { Column[] valCols = new Column[valColsCfg.size()]; - for (int i = 0;i < valCols.length;i++) + for (int i = 0; i < valCols.length; i++) valCols[i] = convert(valColsCfg.get(i)); return new SchemaDescriptor(tblId, schemaVer, keyCols, affCols, valCols); } + + /** + * Constant value supplier. + */ + private static class ConstantSupplier implements Supplier<Object>, Serializable { + /** Value. */ + private final Serializable val; + + /** + * @param val Value. + */ + ConstantSupplier(Serializable val) { + this.val = val; + } + + /** {@inheritDoc */ + @Override public Object get() { + return val; + } + } } diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java index d81e0d2..6d27985 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java @@ -26,6 +26,9 @@ public enum SchemaEvent implements Event { /** This event is fired when a schema was initialized. */ INITIALIZED, + /** This event is fired when a schema was changed. */ + CHANGED, + /** This event is fired when a schema was dropped. */ DROPPED } diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/ColumnMapping.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/ColumnMapping.java new file mode 100644 index 0000000..dbb95a0 --- /dev/null +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/ColumnMapping.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.schema.registry; + +import org.apache.ignite.internal.schema.SchemaDescriptor; + +/** + * Column mapping. + */ +class ColumnMapping { + /** Mapping. */ + private final int[] mapping; + + /** First mapped column index. */ + private final int firstColIdx; + + /** + * @param schema Source schema descriptor. + */ + ColumnMapping(SchemaDescriptor schema) { + firstColIdx = schema.keyColumns().length(); + mapping = new int[schema.valueColumns().length()]; + } + + /** + * Add column mapping. + * + * @param from Column index in source schema. + * @param to Column index in schema. + */ + void add(int from, int to) { + assert from >= firstColIdx && from < firstColIdx + mapping.length; + + mapping[from - firstColIdx] = to; + } + + /** + * Gets mapped column idx. + * + * @param idx Column index in source. + * @return Column index in targer schema. + */ + int map(int idx) { + if (idx < firstColIdx) + return idx; + + return mapping[idx - firstColIdx]; + } +} diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java index f9d9e64..a2fac90 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java @@ -17,8 +17,14 @@ package org.apache.ignite.internal.schema.registry; +import java.util.NoSuchElementException; import java.util.concurrent.ConcurrentSkipListMap; import java.util.function.Function; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.Column; +import org.apache.ignite.internal.schema.Columns; +import org.apache.ignite.internal.schema.InvalidTypeException; +import org.apache.ignite.internal.schema.Row; import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.schema.SchemaRegistry; import org.jetbrains.annotations.Nullable; @@ -60,13 +66,7 @@ public class SchemaRegistryImpl implements SchemaRegistry { this.history = history; } - /** - * Gets schema descriptor for given version. - * - * @param ver Schema version to get descriptor for. - * @return Schema descriptor. - * @throws SchemaRegistryException If no schema found for given version. - */ + /** {@inheritDoc} */ @Override public SchemaDescriptor schema(int ver) { SchemaDescriptor desc = schemaCache.get(ver); @@ -87,26 +87,65 @@ public class SchemaRegistryImpl implements SchemaRegistry { throw new SchemaRegistryException("Failed to find schema: ver=" + ver); } - /** - * Gets schema descriptor for the latest version if initialized. - * - * @return Schema descriptor if initialized, {@code null} otherwise. - * @throws SchemaRegistryException If failed. - */ + /** {@inheritDoc} */ @Override public @Nullable SchemaDescriptor schema() { final int lastVer0 = lastVer; if (lastVer0 == INITIAL_SCHEMA_VERSION) return null; - return schema(lastVer0); + return schema(lastVer0); + } + + /** {@inheritDoc} */ + @Override public int lastSchemaVersion() { + return lastVer; + } + + /** {@inheritDoc} */ + @Override public Row resolve(BinaryRow row) { + final SchemaDescriptor rowSchema = schema(row.schemaVersion()); + final SchemaDescriptor curSchema = schema(); + + if (curSchema.version() == rowSchema.version()) + return new Row(rowSchema, row); + + return new UpgradingRowAdapter(curSchema, row, columnMapper(curSchema, rowSchema)); } /** - * @return Last known schema version. + * Create column mapping for schemas. + * + * @param src Source schema of newer version. + * @param dst Target schema of older version. + * @return Column mapping. */ - public int lastSchemaVersion() { - return lastVer; + private ColumnMapping columnMapper(SchemaDescriptor src, SchemaDescriptor dst) { + assert src.version() > dst.version(); + assert src.version() == dst.version() + 1; // TODO: IGNITE-14863 implement merged mapper for arbitraty schema versions. + + final Columns srcCols = src.valueColumns(); + final Columns dstCols = dst.valueColumns(); + + final ColumnMapping mapping = new ColumnMapping(src); + + for (int i = 0; i < srcCols.columns().length; i++) { + final Column col = srcCols.column(i); + + try { + final int idx = dstCols.columnIndex(col.name()); + + if (!col.equals(dstCols.column(idx))) + throw new InvalidTypeException("Column of incompatible type: [colIdx=" + col.schemaIndex() + ", schemaVer=" + src.version()); + + mapping.add(col.schemaIndex(), dst.keyColumns().length() + idx); + } + catch (NoSuchElementException ex) { + mapping.add(col.schemaIndex(), -1); + } + } + + return mapping; } /** diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java new file mode 100644 index 0000000..8c9a4fc --- /dev/null +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.schema.registry; + +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.InvalidTypeException; +import org.apache.ignite.internal.schema.NativeTypeSpec; +import org.apache.ignite.internal.schema.Row; +import org.apache.ignite.internal.schema.SchemaDescriptor; + +/** + * Adapter for row of older schema. + */ +class UpgradingRowAdapter extends Row { + /** Column mapper. */ + private final ColumnMapping mapping; + + /** + * @param schema Schema descriptor of new version. + * @param row Row. + * @param mapping Column mapping. + */ + UpgradingRowAdapter(SchemaDescriptor schema, BinaryRow row, ColumnMapping mapping) { + super(schema, row); + + this.mapping = mapping; + } + + /** {@inheritDoc} */ + @Override protected long findColumn(int colIdx, NativeTypeSpec type) throws InvalidTypeException { + if (schema.isKeyColumn(colIdx)) + return super.findColumn(colIdx, type); + + int mapIdx = mapping.map(colIdx); + + return (mapIdx < 0) ? Long.MIN_VALUE : super.findColumn(mapIdx, type); + } +} diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java index e3dd181..515889c 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.ignite.internal.affinity.RendezvousAffinityFunction; +import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.ByteBufferRow; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.NativeTypes; @@ -253,9 +254,17 @@ public class ITDistributedTableTest { return SCHEMA; } + @Override public int lastSchemaVersion() { + return SCHEMA.version(); + } + @Override public SchemaDescriptor schema(int ver) { return SCHEMA; } + + @Override public Row resolve(BinaryRow row) { + return new Row(SCHEMA, row); + } }); partitionedTableView(tbl, PARTS * 10); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java index be1c3ac..2ba0bbe 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java @@ -25,7 +25,6 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.Row; -import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.schema.marshaller.TupleMarshaller; import org.apache.ignite.internal.schema.SchemaRegistry; import org.apache.ignite.table.InvokeProcessor; @@ -307,9 +306,9 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar if (row == null) return null; - final SchemaDescriptor schema = schemaReg.schema(row.schemaVersion()); + final Row wrapped = schemaReg.resolve(row); - return new TableRow(schema, new Row(schema, row)); + return new TableRow(wrapped.rowSchema(), wrapped); } /** diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java index 99c8c8d..d4bdc23 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java @@ -27,7 +27,6 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.Row; -import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.schema.SchemaRegistry; import org.apache.ignite.internal.schema.marshaller.TupleMarshaller; import org.apache.ignite.table.InvokeProcessor; @@ -395,9 +394,9 @@ public class TableImpl extends AbstractTableView implements Table { if (row == null) return null; - final SchemaDescriptor schema = schemaReg.schema(row.schemaVersion()); + final Row wrapped = schemaReg.resolve(row); - return new TableRow(schema, new Row(schema, row)); + return new TableRow(wrapped.rowSchema(), wrapped); } /** diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableRow.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableRow.java index 74ed57a..8814951 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableRow.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableRow.java @@ -27,7 +27,7 @@ import org.jetbrains.annotations.Nullable; /** * Row to Tuple adapter. - * + * <p> * Provides methods to access columns values by column names. */ public class TableRow extends RowChunkAdapter { @@ -57,7 +57,7 @@ public class TableRow extends RowChunkAdapter { final Column col = schema.column(colName); if (col == null) - throw new IllegalArgumentException("Invalid column name: columnName=" + colName + ", schemaVersion=" + schema.version()); + throw new ColumnNotFoundException("Invalid column name: columnName=" + colName + ", schemaVersion=" + schema.version()); return col; } @@ -81,7 +81,7 @@ public class TableRow extends RowChunkAdapter { return row; } - /** */ + /** {@inheritDoc} */ @Override public boolean contains(String colName) { return schema.column(colName) != null; } @@ -100,12 +100,12 @@ public class TableRow extends RowChunkAdapter { final Column col = schema.column(colName); if (col == null || !schema.isKeyColumn(col.schemaIndex())) - throw new IllegalArgumentException("Invalid key column name: columnName=" + colName + ", schemaVersion=" + schema.version()); + throw new ColumnNotFoundException("Invalid key column name: columnName=" + colName + ", schemaVersion=" + schema.version()); return col; } - /** */ + /** {@inheritDoc} */ @Override public boolean contains(String colName) { return schema.column(colName) != null; } @@ -125,12 +125,12 @@ public class TableRow extends RowChunkAdapter { final Column col = schema.column(colName); if (col == null || schema.isKeyColumn(col.schemaIndex())) - throw new IllegalArgumentException("Invalid key column name: columnName=" + colName + ", schemaVersion=" + schema.version()); + throw new ColumnNotFoundException("Invalid value column name: columnName=" + colName + ", schemaVersion=" + schema.version()); return col; } - /** */ + /** {@inheritDoc} */ @Override public boolean contains(String colName) { return schema.column(colName) != null; } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 4b14ad9..a535c11 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -31,9 +31,11 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.apache.ignite.configuration.schemas.table.TableChange; import org.apache.ignite.configuration.schemas.table.TableView; import org.apache.ignite.configuration.schemas.table.TablesConfiguration; +import org.apache.ignite.configuration.NamedListView; import org.apache.ignite.internal.affinity.AffinityManager; import org.apache.ignite.internal.affinity.event.AffinityEvent; import org.apache.ignite.internal.affinity.event.AffinityEventParameters; @@ -199,135 +201,240 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp //TODO: IGNITE-14652 Change a metastorage update in listeners to multi-invoke configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().listen(ctx -> { Set<String> tablesToStart = (ctx.newValue() == null || ctx.newValue().namedListKeys() == null) ? - Collections.emptySet() : new HashSet<>(ctx.newValue().namedListKeys()); + Collections.emptySet() : + ctx.newValue().namedListKeys().stream().filter(t -> !ctx.oldValue().namedListKeys().contains(t)).collect(Collectors.toSet()); - tablesToStart.removeAll(ctx.oldValue().namedListKeys()); - - long revision = ctx.storageRevision(); + Set<String> tablesToStop = (ctx.oldValue() == null || ctx.oldValue().namedListKeys() == null) ? + Collections.emptySet() : + ctx.oldValue().namedListKeys().stream().filter(t -> !ctx.newValue().namedListKeys().contains(t)).collect(Collectors.toSet()); List<CompletableFuture<Boolean>> futs = new ArrayList<>(); - boolean hasMetastorageLocally = metaStorageMgr.hasMetastorageLocally(configurationMgr); + final Set<String> schemaChanged = + (ctx.oldValue() != null && ctx.oldValue().namedListKeys() != null && ctx.newValue() != null && ctx.newValue().namedListKeys() != null) ? + ctx.oldValue().namedListKeys().stream() + .filter(tblName -> ctx.newValue().namedListKeys().contains(tblName)) // Filter changed tables. + .filter(tblName -> { + final TableView newTbl = ctx.newValue().get(tblName); + final TableView oldTbl = ctx.oldValue().get(tblName); - for (String tblName : tablesToStart) { - TableView tableView = ctx.newValue().get(tblName); + assert newTbl.columns().namedListKeys() != null && oldTbl.columns().namedListKeys() != null; - UUID tblId = new UUID(revision, 0L); + return newTbl.columns().namedListKeys().stream().anyMatch(c -> !oldTbl.columns().namedListKeys().contains(c)) || + oldTbl.columns().namedListKeys().stream().anyMatch(c -> !newTbl.columns().namedListKeys().contains(c)); + }).collect(Collectors.toSet()) : + Collections.emptySet(); - if (hasMetastorageLocally) { - var key = new ByteArray(INTERNAL_PREFIX + tblId); - futs.add(metaStorageMgr.invoke( - Conditions.notExists(key), - Operations.put(key, tableView.name().getBytes(StandardCharsets.UTF_8)), - Operations.noop()) - .thenCompose(res -> schemaMgr.initSchemaForTable(tblId, tableView.name())) - .thenCompose(res -> affMgr.calculateAssignments(tblId, tableView.name()))); - } + if (!tablesToStart.isEmpty()) + futs.addAll(startTables(tablesToStart, ctx.storageRevision(), ctx.newValue())); - final CompletableFuture<AffinityEventParameters> affinityReadyFut = new CompletableFuture<>(); - final CompletableFuture<SchemaEventParameters> schemaReadyFut = new CompletableFuture<>(); + if (!schemaChanged.isEmpty()) + futs.addAll(changeSchema(schemaChanged)); - CompletableFuture.allOf(affinityReadyFut, schemaReadyFut) - .exceptionally(e -> { - LOG.error("Failed to create a new table [name=" + tblName + ", id=" + tblId + ']', e); + if (!tablesToStop.isEmpty()) + futs.addAll(stopTables(tablesToStop)); - onEvent(TableEvent.CREATE, new TableEventParameters(tblId, tblName), e); + return CompletableFuture.allOf(futs.toArray(CompletableFuture[]::new)); + }); + } - return null; - }) - .thenRun(() -> createTableLocally( - tblName, - tblId, - affinityReadyFut.join().assignment(), - schemaReadyFut.join().schemaRegistry() - )); + /** + * Start tables routine. + * + * @param tbls Tables to start. + * @param rev Metastore revision. + * @param cfgs Table configurations. + * @return Table creation futures. + */ + private List<CompletableFuture<Boolean>> startTables(Set<String> tbls, long rev, NamedListView<TableView> cfgs) { + boolean hasMetastorageLocally = metaStorageMgr.hasMetastorageLocally(configurationMgr); + + List<CompletableFuture<Boolean>> futs = new ArrayList<>(); - affMgr.listen(AffinityEvent.CALCULATED, new EventListener<AffinityEventParameters>() { - @Override public boolean notify(@NotNull AffinityEventParameters parameters, @Nullable Throwable e) { - if (!tblId.equals(parameters.tableId())) - return false; + for (String tblName : tbls) { + TableView tableView = cfgs.get(tblName); + + UUID tblId = new UUID(rev, 0L); + + if (hasMetastorageLocally) { + var key = new ByteArray(INTERNAL_PREFIX + tblId); + futs.add(metaStorageMgr.invoke( + Conditions.notExists(key), + Operations.put(key, tableView.name().getBytes(StandardCharsets.UTF_8)), + Operations.noop()) + .thenCompose(res -> schemaMgr.initSchemaForTable(tblId, tableView.name())) + .thenCompose(res -> affMgr.calculateAssignments(tblId, tableView.name()))); + } - if (e == null) - affinityReadyFut.complete(parameters); - else - affinityReadyFut.completeExceptionally(e); + final CompletableFuture<AffinityEventParameters> affinityReadyFut = new CompletableFuture<>(); + final CompletableFuture<SchemaEventParameters> schemaReadyFut = new CompletableFuture<>(); - return true; - } + CompletableFuture.allOf(affinityReadyFut, schemaReadyFut) + .exceptionally(e -> { + LOG.error("Failed to create a new table [name=" + tblName + ", id=" + tblId + ']', e); - @Override public void remove(@NotNull Throwable e) { + onEvent(TableEvent.CREATE, new TableEventParameters(tblId, tblName), e); + + return null; + }) + .thenRun(() -> createTableLocally( + tblName, + tblId, + affinityReadyFut.join().assignment(), + schemaReadyFut.join().schemaRegistry() + )); + + affMgr.listen(AffinityEvent.CALCULATED, new EventListener<>() { + @Override public boolean notify(@NotNull AffinityEventParameters parameters, @Nullable Throwable e) { + if (!tblId.equals(parameters.tableId())) + return false; + + if (e == null) + affinityReadyFut.complete(parameters); + else affinityReadyFut.completeExceptionally(e); - } - }); - schemaMgr.listen(SchemaEvent.INITIALIZED, new EventListener<SchemaEventParameters>() { - @Override public boolean notify(@NotNull SchemaEventParameters parameters, @Nullable Throwable e) { - if (!tblId.equals(parameters.tableId())) - return false; + return true; + } - if (e == null) - schemaReadyFut.complete(parameters); - else - schemaReadyFut.completeExceptionally(e); + @Override public void remove(@NotNull Throwable e) { + affinityReadyFut.completeExceptionally(e); + } + }); - return true; - } + schemaMgr.listen(SchemaEvent.INITIALIZED, new EventListener<>() { + @Override public boolean notify(@NotNull SchemaEventParameters parameters, @Nullable Throwable e) { + if (!tblId.equals(parameters.tableId()) && parameters.schemaRegistry().lastSchemaVersion() >= 1) + return false; - @Override public void remove(@NotNull Throwable e) { + if (e == null) + schemaReadyFut.complete(parameters); + else schemaReadyFut.completeExceptionally(e); - } - }); - } - Set<String> tablesToStop = (ctx.oldValue() == null || ctx.oldValue().namedListKeys() == null) ? - Collections.emptySet() : new HashSet<>(ctx.oldValue().namedListKeys()); + return true; + } - tablesToStop.removeAll(ctx.newValue().namedListKeys()); + @Override public void remove(@NotNull Throwable e) { + schemaReadyFut.completeExceptionally(e); + } + }); + } - for (String tblName : tablesToStop) { - TableImpl t = tables.get(tblName); + return futs; + } + + /** + * Drop tables routine. + * + * @param tbls Tables to drop. + * @return Table drop futures. + */ + private List<CompletableFuture<Boolean>> stopTables(Set<String> tbls) { + boolean hasMetastorageLocally = metaStorageMgr.hasMetastorageLocally(configurationMgr); - UUID tblId = t.tableId(); + List<CompletableFuture<Boolean>> futs = new ArrayList<>(); - if (hasMetastorageLocally) { - var key = new ByteArray(INTERNAL_PREFIX + tblId); + for (String tblName : tbls) { + TableImpl t = tables.get(tblName); - futs.add(affMgr.removeAssignment(tblId) - .thenCompose(res -> schemaMgr.unregisterSchemas(tblId)) - .thenCompose(res -> - metaStorageMgr.invoke(Conditions.exists(key), - Operations.remove(key), - Operations.noop()))); - } + UUID tblId = t.tableId(); - affMgr.listen(AffinityEvent.REMOVED, new EventListener<AffinityEventParameters>() { - @Override public boolean notify(@NotNull AffinityEventParameters parameters, @Nullable Throwable e) { - if (!tblId.equals(parameters.tableId())) - return false; + if (hasMetastorageLocally) { + var key = new ByteArray(INTERNAL_PREFIX + tblId); - if (e == null) - dropTableLocally(tblName, tblId, parameters.assignment()); - else - onEvent(TableEvent.DROP, new TableEventParameters(tblId, tblName), e); + futs.add(affMgr.removeAssignment(tblId) + .thenCompose(res -> schemaMgr.unregisterSchemas(tblId)) + .thenCompose(res -> + metaStorageMgr.invoke(Conditions.exists(key), + Operations.remove(key), + Operations.noop()))); + } - return true; - } + affMgr.listen(AffinityEvent.REMOVED, new EventListener<>() { + @Override public boolean notify(@NotNull AffinityEventParameters parameters, @Nullable Throwable e) { + if (!tblId.equals(parameters.tableId())) + return false; - @Override public void remove(@NotNull Throwable e) { + if (e == null) + dropTableLocally(tblName, tblId, parameters.assignment()); + else onEvent(TableEvent.DROP, new TableEventParameters(tblId, tblName), e); - } - }); - } - return CompletableFuture.allOf(futs.toArray(CompletableFuture[]::new)); - }); + return true; + } + + @Override public void remove(@NotNull Throwable e) { + onEvent(TableEvent.DROP, new TableEventParameters(tblId, tblName), e); + } + }); + } + + return futs; + } + + /** + * Start tables routine. + * + * @param tbls Tables to start. + * @return Table creation futures. + */ + private List<CompletableFuture<Boolean>> changeSchema(Set<String> tbls) { + boolean hasMetastorageLocally = metaStorageMgr.hasMetastorageLocally(configurationMgr); + + List<CompletableFuture<Boolean>> futs = new ArrayList<>(); + + for (String tblName : tbls) { + TableImpl tbl = tables.get(tblName); + + UUID tblId = tbl.tableId(); + + final int ver = tbl.schemaView().lastSchemaVersion() + 1; + + if (hasMetastorageLocally) + futs.add(schemaMgr.updateSchemaForTable(tblId, tblName)); + + final CompletableFuture<SchemaEventParameters> schemaReadyFut = new CompletableFuture<>(); + + CompletableFuture.allOf(schemaReadyFut) + .exceptionally(e -> { + LOG.error("Failed to upgrade schema for a table [name=" + tblName + ", id=" + tblId + ']', e); + + onEvent(TableEvent.ALTER, new TableEventParameters(tblId, tblName), e); + + return null; + }) + .thenRun(() -> + onEvent(TableEvent.ALTER, new TableEventParameters(tblId, tblName), null) + ); + + schemaMgr.listen(SchemaEvent.CHANGED, new EventListener<>() { + @Override public boolean notify(@NotNull SchemaEventParameters parameters, @Nullable Throwable e) { + if (!tblId.equals(parameters.tableId()) && parameters.schemaRegistry().lastSchemaVersion() < ver) + return false; + + if (e == null) + schemaReadyFut.complete(parameters); + else + schemaReadyFut.completeExceptionally(e); + + return true; + } + + @Override public void remove(@NotNull Throwable e) { + schemaReadyFut.completeExceptionally(e); + } + }); + } + + return futs; } /** {@inheritDoc} */ @Override public Table createTable(String name, Consumer<TableChange> tableInitChange) { CompletableFuture<Table> tblFut = new CompletableFuture<>(); - listen(TableEvent.CREATE, new EventListener<TableEventParameters>() { + listen(TableEvent.CREATE, new EventListener<>() { @Override public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) { String tableName = parameters.tableName(); @@ -362,10 +469,49 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp } /** {@inheritDoc} */ + @Override public void alterTable(String name, Consumer<TableChange> tableChange) { + CompletableFuture<Void> tblFut = new CompletableFuture<>(); + + listen(TableEvent.ALTER, new EventListener<>() { + @Override public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) { + String tableName = parameters.tableName(); + + if (!name.equals(tableName)) + return false; + + if (e == null) { + tblFut.complete(null); + } + else + tblFut.completeExceptionally(e); + + return true; + } + + @Override public void remove(@NotNull Throwable e) { + tblFut.completeExceptionally(e); + } + }); + + try { + configurationMgr.configurationRegistry() + .getConfiguration(TablesConfiguration.KEY).tables().change(change -> + change.update(name, tableChange)).get(); + } + catch (InterruptedException | ExecutionException e) { + LOG.error("Table wasn't created [name=" + name + ']', e); + + tblFut.completeExceptionally(e); + } + + tblFut.join(); + } + + /** {@inheritDoc} */ @Override public void dropTable(String name) { CompletableFuture<Void> dropTblFut = new CompletableFuture<>(); - listen(TableEvent.DROP, new EventListener<TableEventParameters>() { + listen(TableEvent.DROP, new EventListener<>() { @Override public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) { String tableName = parameters.tableName(); @@ -428,7 +574,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp private HashSet<String> tableNamesConfigured() { IgniteBiTuple<ByteArray, ByteArray> range = toRange(new ByteArray(PUBLIC_PREFIX)); - HashSet tableNames = new HashSet(); + HashSet<String> tableNames = new HashSet<>(); try (Cursor<Entry> cursor = metaStorageMgr.range(range.get1(), range.get2())) { while (cursor.hasNext()) { @@ -438,7 +584,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp int idx = -1; - while ((idx = keyTail.indexOf('.', idx + 1)) > 0 && keyTail.charAt(idx - 1) == '\\'); + //noinspection StatementWithEmptyBody + while ((idx = keyTail.indexOf('.', idx + 1)) > 0 && keyTail.charAt(idx - 1) == '\\') + ; String tablName = keyTail.substring(0, idx); @@ -476,7 +624,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp CompletableFuture<Table> getTblFut = new CompletableFuture<>(); - EventListener<TableEventParameters> clo = new EventListener<TableEventParameters>() { + EventListener<TableEventParameters> clo = new EventListener<>() { @Override public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) { if (e instanceof ListenerRemovedException) { getTblFut.completeExceptionally(e); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEvent.java b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEvent.java index f2a737e..94d508a 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEvent.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEvent.java @@ -26,6 +26,9 @@ public enum TableEvent implements Event { /** This event is fired when a table was created. */ CREATE, + /** This event is fired when a table config was changed. */ + ALTER, + /** This event is fired when a table was dropped. */ DROP } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TableBinaryViewOperationsTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TableBinaryViewOperationsTest.java index f9eea5b..d5ddca8 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/TableBinaryViewOperationsTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TableBinaryViewOperationsTest.java @@ -53,8 +53,8 @@ public class TableBinaryViewOperationsTest { SchemaDescriptor schema = new SchemaDescriptor( tableId, 1, - new Column[] {new Column("id", NativeTypes.LONG, false)}, - new Column[] {new Column("val", NativeTypes.LONG, false)} + new Column[]{new Column("id", NativeTypes.LONG, false)}, + new Column[]{new Column("val", NativeTypes.LONG, false)} ); Table tbl = new TableImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema)); @@ -88,8 +88,8 @@ public class TableBinaryViewOperationsTest { SchemaDescriptor schema = new SchemaDescriptor( tableId, 1, - new Column[] {new Column("id", NativeTypes.LONG, false)}, - new Column[] {new Column("val", NativeTypes.LONG, false)} + new Column[]{new Column("id", NativeTypes.LONG, false)}, + new Column[]{new Column("val", NativeTypes.LONG, false)} ); Table tbl = new TableImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema)); @@ -124,8 +124,8 @@ public class TableBinaryViewOperationsTest { SchemaDescriptor schema = new SchemaDescriptor( tableId, 1, - new Column[] {new Column("id", NativeTypes.LONG, false)}, - new Column[] {new Column("val", NativeTypes.LONG, false)} + new Column[]{new Column("id", NativeTypes.LONG, false)}, + new Column[]{new Column("val", NativeTypes.LONG, false)} ); Table tbl = new TableImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema)); @@ -157,8 +157,8 @@ public class TableBinaryViewOperationsTest { SchemaDescriptor schema = new SchemaDescriptor( tableId, 1, - new Column[] {new Column("id", NativeTypes.LONG, false)}, - new Column[] {new Column("val", NativeTypes.LONG, false)} + new Column[]{new Column("id", NativeTypes.LONG, false)}, + new Column[]{new Column("val", NativeTypes.LONG, false)} ); Table tbl = new TableImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema)); @@ -188,8 +188,8 @@ public class TableBinaryViewOperationsTest { SchemaDescriptor schema = new SchemaDescriptor( tableId, 1, - new Column[] {new Column("id", NativeTypes.LONG, false)}, - new Column[] {new Column("val", NativeTypes.LONG, false)} + new Column[]{new Column("id", NativeTypes.LONG, false)}, + new Column[]{new Column("val", NativeTypes.LONG, false)} ); Table tbl = new TableImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema)); @@ -240,8 +240,8 @@ public class TableBinaryViewOperationsTest { SchemaDescriptor schema = new SchemaDescriptor( tableId, 1, - new Column[] {new Column("id", NativeTypes.LONG, false)}, - new Column[] {new Column("val", NativeTypes.LONG, false)} + new Column[]{new Column("id", NativeTypes.LONG, false)}, + new Column[]{new Column("val", NativeTypes.LONG, false)} ); Table tbl = new TableImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema)); @@ -276,8 +276,8 @@ public class TableBinaryViewOperationsTest { SchemaDescriptor schema = new SchemaDescriptor( tableId, 1, - new Column[] {new Column("id", NativeTypes.LONG, false)}, - new Column[] {new Column("val", NativeTypes.LONG, false)} + new Column[]{new Column("id", NativeTypes.LONG, false)}, + new Column[]{new Column("val", NativeTypes.LONG, false)} ); Table tbl = new TableImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema)); @@ -313,8 +313,8 @@ public class TableBinaryViewOperationsTest { SchemaDescriptor schema = new SchemaDescriptor( tableId, 1, - new Column[] {new Column("id", NativeTypes.LONG, false)}, - new Column[] { + new Column[]{new Column("id", NativeTypes.LONG, false)}, + new Column[]{ new Column("val", NativeTypes.LONG, true), new Column("str", NativeTypes.stringOf(3), true), new Column("blob", NativeTypes.blobOf(3), true) @@ -326,7 +326,7 @@ public class TableBinaryViewOperationsTest { final Tuple keyTuple0 = new TestTupleBuilder().set("id", 0).set("id1", 0).build(); final Tuple keyTuple1 = new TestTupleBuilder().set("id1", 0).build(); final Tuple tuple0 = new TestTupleBuilder().set("id", 1L).set("str", "qweqweqwe").set("val", 11L).build(); - final Tuple tuple1 = new TestTupleBuilder().set("id", 1L).set("blob", new byte[] {0, 1, 2, 3}).set("val", 22L).build(); + final Tuple tuple1 = new TestTupleBuilder().set("id", 1L).set("blob", new byte[]{0, 1, 2, 3}).set("val", 22L).build(); assertThrows(InvalidTypeException.class, () -> tbl.get(keyTuple0)); assertThrows(IllegalArgumentException.class, () -> tbl.get(keyTuple1)); @@ -349,11 +349,11 @@ public class TableBinaryViewOperationsTest { SchemaDescriptor schema = new SchemaDescriptor( tableId, 1, - new Column[] {new Column("id", NativeTypes.LONG, false)}, - new Column[] { + new Column[]{new Column("id", NativeTypes.LONG, false)}, + new Column[]{ new Column("val", NativeTypes.LONG, true, () -> 28L), new Column("str", NativeTypes.stringOf(3), true, () -> "ABC"), - new Column("blob", NativeTypes.blobOf(3), true, () -> new byte[] {0, 1, 2}) + new Column("blob", NativeTypes.blobOf(3), true, () -> new byte[]{0, 1, 2}) } ); @@ -363,7 +363,7 @@ public class TableBinaryViewOperationsTest { final Tuple keyTuple1 = tbl.tupleBuilder().set("id", 1L).build(); final Tuple tuple0 = tbl.tupleBuilder().set("id", 0L).build(); - final Tuple tupleExpected0 = tbl.tupleBuilder().set("id", 0L).set("val", 28L).set("str", "ABC").set("blob", new byte[] {0, 1, 2}).build(); + final Tuple tupleExpected0 = tbl.tupleBuilder().set("id", 0L).set("val", 28L).set("str", "ABC").set("blob", new byte[]{0, 1, 2}).build(); final Tuple tuple1 = tbl.tupleBuilder().set("id", 1L).set("val", null).set("str", null).set("blob", null).build(); tbl.insert(tuple0); @@ -427,7 +427,7 @@ public class TableBinaryViewOperationsTest { if (val1 instanceof byte[] && val2 instanceof byte[]) Assertions.assertArrayEquals((byte[])val1, (byte[])val2, "Equality check failed: colIdx=" + col.schemaIndex()); else - Assertions.assertEquals(val1, val2, "Equality check failed: colIdx=" + col.schemaIndex()); + Assertions.assertEquals(val1, val2, "Equality check failed: colIdx=" + col.schemaIndex()); } } } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java index 9ecaec4..aa47bab 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.table.impl; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.Row; import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.schema.SchemaRegistry; import org.jetbrains.annotations.NotNull; @@ -52,4 +54,16 @@ public class DummySchemaManagerImpl implements SchemaRegistry { return schema; } + + /** {@inheritDoc} */ + @Override public int lastSchemaVersion() { + return schema.version(); + } + + /** {@inheritDoc} */ + @Override public Row resolve(BinaryRow row) { + assert row.schemaVersion() == schema.version(); + + return new Row(schema, row); + } }