This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7f82358  IGNITE-14863: Schema evolution. Add and remove column. (#173)
7f82358 is described below

commit 7f82358fa7aeead945991e7e5fcdd9b9d8ffab44
Author: Andrew V. Mashenkov <amashen...@users.noreply.github.com>
AuthorDate: Mon Jul 5 17:58:10 2021 +0300

    IGNITE-14863: Schema evolution. Add and remove column. (#173)
---
 .../apache/ignite/table/manager/IgniteTables.java  |   9 +-
 .../configuration/processor/Processor.java         |   4 +-
 .../runner/app/AbstractSchemaChangeTest.java       | 158 ++++++++++
 .../runner/app/DynamicTableCreationTest.java       |  18 +-
 .../runner/app/SchemaChangeKVViewTest.java         | 119 ++++++++
 .../runner/app/SchemaChangeTableViewTest.java      | 116 +++++++
 .../org/apache/ignite/internal/schema/Column.java  |  16 +-
 .../org/apache/ignite/internal/schema/Row.java     |  83 +++--
 .../ignite/internal/schema/SchemaManager.java      |  51 +++-
 .../ignite/internal/schema/SchemaRegistry.java     |  23 +-
 .../configuration/SchemaDescriptorConverter.java   |  38 ++-
 .../ignite/internal/schema/event/SchemaEvent.java  |   3 +
 .../internal/schema/registry/ColumnMapping.java    |  64 ++++
 .../schema/registry/SchemaRegistryImpl.java        |  73 +++--
 .../schema/registry/UpgradingRowAdapter.java       |  53 ++++
 .../ignite/distributed/ITDistributedTableTest.java |   9 +
 .../ignite/internal/table/KVBinaryViewImpl.java    |   5 +-
 .../apache/ignite/internal/table/TableImpl.java    |   5 +-
 .../org/apache/ignite/internal/table/TableRow.java |  14 +-
 .../internal/table/distributed/TableManager.java   | 336 +++++++++++++++------
 .../ignite/internal/table/event/TableEvent.java    |   3 +
 .../table/TableBinaryViewOperationsTest.java       |  44 +--
 .../table/impl/DummySchemaManagerImpl.java         |  14 +
 23 files changed, 1054 insertions(+), 204 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java 
b/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java
index 780e9f1..aab5f3e 100644
--- 
a/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java
+++ 
b/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java
@@ -31,7 +31,6 @@ import org.apache.ignite.table.Table;
 public interface IgniteTables {
     /**
      * Creates a cluster table.
-     * The table changes if already exists.
      *
      * @param name Table name.
      * @param tableInitChange Table changer.
@@ -40,6 +39,14 @@ public interface IgniteTables {
     Table createTable(String name, Consumer<TableChange> tableInitChange);
 
     /**
+     * Alter a cluster table.
+     *
+     * @param name Table name.
+     * @param tableChange Table changer.
+     */
+    void alterTable(String name, Consumer<TableChange> tableChange);
+
+    /**
      * Drops a table with the name specified.
      *
      * @param name Table name.
diff --git 
a/modules/configuration-annotation-processor/src/main/java/org/apache/ignite/internal/configuration/processor/Processor.java
 
b/modules/configuration-annotation-processor/src/main/java/org/apache/ignite/internal/configuration/processor/Processor.java
index d0ef7e3..43ab153 100644
--- 
a/modules/configuration-annotation-processor/src/main/java/org/apache/ignite/internal/configuration/processor/Processor.java
+++ 
b/modules/configuration-annotation-processor/src/main/java/org/apache/ignite/internal/configuration/processor/Processor.java
@@ -300,6 +300,7 @@ public class Processor extends AbstractProcessor {
             .addModifiers(PUBLIC);
 
         TypeSpec.Builder changeClsBuilder = 
TypeSpec.interfaceBuilder(changeClsName)
+            .addSuperinterface(viewClsName)
             .addModifiers(PUBLIC);
 
         ClassName consumerClsName = ClassName.get(Consumer.class);
@@ -337,9 +338,6 @@ public class Processor extends AbstractProcessor {
                     .returns(viewFieldType);
 
                 viewClsBuilder.addMethod(getMtdBuilder.build());
-
-                if (valAnnotation != null)
-                    changeClsBuilder.addMethod(getMtdBuilder.build());
             }
 
             {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
new file mode 100644
index 0000000..c2c560a
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.app.Ignite;
+import org.apache.ignite.app.IgnitionManager;
+import org.apache.ignite.internal.app.IgnitionCleaner;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.schema.Column;
+import org.apache.ignite.schema.ColumnType;
+import org.apache.ignite.schema.SchemaBuilders;
+import org.apache.ignite.schema.SchemaTable;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+
+import static 
org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter.convert;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Ignition interface tests.
+ */
+abstract class AbstractSchemaChangeTest {
+    /** Table name. */
+    public static final String TABLE = "PUBLIC.tbl1";
+
+    /** Nodes bootstrap configuration. */
+    private final Map<String, String> nodesBootstrapCfg = new 
LinkedHashMap<>() {{
+        put("node0", "{\n" +
+            "  \"node\": {\n" +
+            "    \"metastorageNodes\":[ \"node0\" ]\n" +
+            "  },\n" +
+            "  \"network\": {\n" +
+            "    \"port\":3344,\n" +
+            "    \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", 
\"localhost:3346\" ]\n" +
+            "  }\n" +
+            "}");
+
+        put("node1", "{\n" +
+            "  \"node\": {\n" +
+            "    \"metastorageNodes\":[ \"node0\" ]\n" +
+            "  },\n" +
+            "  \"network\": {\n" +
+            "    \"port\":3345,\n" +
+            "    \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", 
\"localhost:3346\" ]\n" +
+            "  }\n" +
+            "}");
+
+        put("node2", "{\n" +
+            "  \"node\": {\n" +
+            "    \"metastorageNodes\":[ \"node0\" ]\n" +
+            "  },\n" +
+            "  \"network\": {\n" +
+            "    \"port\":3346,\n" +
+            "    \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", 
\"localhost:3346\" ]\n" +
+            "  }\n" +
+            "}");
+    }};
+
+    /** Cluster nodes. */
+    private final List<Ignite> clusterNodes = new ArrayList<>();
+
+    /**
+     *
+     */
+    @BeforeAll
+    static void beforeAll() throws Exception {
+        IgnitionCleaner.removeAllData();
+    }
+
+    /**
+     *
+     */
+    @AfterEach
+    void afterEach() throws Exception {
+        IgniteUtils.closeAll(clusterNodes);
+
+        IgnitionCleaner.removeAllData();
+    }
+
+    /**
+     * @return Grid nodes.
+     */
+    @NotNull protected List<Ignite> startGrid() {
+        List<Ignite> clusterNodes = new ArrayList<>();
+
+        for (Map.Entry<String, String> nodeBootstrapCfg : 
nodesBootstrapCfg.entrySet())
+            clusterNodes.add(IgnitionManager.start(nodeBootstrapCfg.getKey(), 
nodeBootstrapCfg.getValue()));
+
+        assertEquals(3, clusterNodes.size());
+        return clusterNodes;
+    }
+
+    /**
+     * @param nodes Cluster nodes.
+     */
+    @NotNull protected void createTable(List<Ignite> nodes) {
+        // Create table on node 0.
+        SchemaTable schTbl1 = SchemaBuilders.tableBuilder("PUBLIC", 
"tbl1").columns(
+            SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
+            SchemaBuilders.column("valInt", 
ColumnType.INT32).asNullable().build(),
+            SchemaBuilders.column("valStr", 
ColumnType.string()).withDefaultValue("default").build()
+        ).withPrimaryKey("key").build();
+
+        nodes.get(0).tables().createTable(
+            schTbl1.canonicalName(),
+            tblCh -> convert(schTbl1, 
tblCh).changeReplicas(1).changePartitions(10)
+        );
+    }
+
+    /**
+     * @param nodes Cluster nodes.
+     * @param columnToAdd Column to add.
+     */
+    protected void addColumn(List<Ignite> nodes, Column columnToAdd) {
+        nodes.get(0).tables().alterTable(TABLE,
+            chng -> chng.changeColumns(cols -> {
+                final int colIdx = chng.columns().size();
+                //TODO: avoid 'colIdx' or replace with correct last colIdx.
+                cols.create(String.valueOf(colIdx), colChg -> 
convert(columnToAdd, colChg));
+            }));
+    }
+
+    /**
+     * @param nodes Cluster nodes.
+     * @param colName Name of column to drop.
+     */
+    protected void dropColumn(List<Ignite> nodes, String colName) {
+        nodes.get(0).tables().alterTable(TABLE,
+            chng -> chng.changeColumns(cols -> {
+                cols.delete(chng.columns().namedListKeys().stream()
+                    .filter(key -> 
colName.equals(chng.columns().get(key).name()))
+                    .findAny()
+                    .orElseThrow(() -> {
+                        throw new IllegalStateException("Column not found.");
+                    }));
+            }));
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java
index 6ca33a2..14f9122 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.app.IgnitionManager;
 import org.apache.ignite.internal.app.IgnitionCleaner;
 import org.apache.ignite.internal.schema.SchemaManager;
 import 
org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
+import org.apache.ignite.internal.table.SchemaMismatchException;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.schema.ColumnType;
@@ -131,8 +132,8 @@ class DynamicTableCreationTest {
         final Tuple keyTuple1 = tbl2.tupleBuilder().set("key", 1L).build();
         final Tuple keyTuple2 = kvView2.tupleBuilder().set("key", 2L).build();
 
-        assertThrows(IllegalArgumentException.class, () -> 
kvView2.get(keyTuple1).value("key"));
-        assertThrows(IllegalArgumentException.class, () -> 
kvView2.get(keyTuple1).value("key"));
+        assertThrows(SchemaMismatchException.class, () -> 
kvView2.get(keyTuple1).value("key"));
+        assertThrows(SchemaMismatchException.class, () -> 
kvView2.get(keyTuple1).value("key"));
         assertEquals(1, (Long)tbl2.get(keyTuple1).value("key"));
         assertEquals(2, (Long)tbl2.get(keyTuple2).value("key"));
 
@@ -140,6 +141,11 @@ class DynamicTableCreationTest {
         assertEquals(111, (Integer)kvView2.get(keyTuple1).value("val"));
         assertEquals(222, (Integer)tbl2.get(keyTuple2).value("val"));
         assertEquals(222, (Integer)kvView2.get(keyTuple2).value("val"));
+
+        assertThrows(SchemaMismatchException.class, () -> 
tbl1.get(keyTuple1).value("key"));
+        assertThrows(SchemaMismatchException.class, () -> 
kvView1.get(keyTuple1).value("key"));
+        assertThrows(SchemaMismatchException.class, () -> 
tbl1.get(keyTuple1).value("val"));
+        assertThrows(SchemaMismatchException.class, () -> 
kvView1.get(keyTuple1).value("val"));
     }
 
     /**
@@ -193,10 +199,10 @@ class DynamicTableCreationTest {
         final Tuple keyTuple2 = tbl2.tupleBuilder().set("key", 
uuid2).set("affKey", 4242L).build();
 
         // KV view must NOT return key columns in value.
-        assertThrows(IllegalArgumentException.class, () -> 
kvView2.get(keyTuple1).value("key"));
-        assertThrows(IllegalArgumentException.class, () -> 
kvView2.get(keyTuple1).value("affKey"));
-        assertThrows(IllegalArgumentException.class, () -> 
kvView2.get(keyTuple2).value("key"));
-        assertThrows(IllegalArgumentException.class, () -> 
kvView2.get(keyTuple2).value("affKey"));
+        assertThrows(SchemaMismatchException.class, () -> 
kvView2.get(keyTuple1).value("key"));
+        assertThrows(SchemaMismatchException.class, () -> 
kvView2.get(keyTuple1).value("affKey"));
+        assertThrows(SchemaMismatchException.class, () -> 
kvView2.get(keyTuple2).value("key"));
+        assertThrows(SchemaMismatchException.class, () -> 
kvView2.get(keyTuple2).value("affKey"));
 
         // Record binary view MUST return key columns in value.
         assertEquals(uuid, tbl2.get(keyTuple1).value("key"));
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeKVViewTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeKVViewTest.java
new file mode 100644
index 0000000..e0c2101
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeKVViewTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app;
+
+import java.util.List;
+import org.apache.ignite.app.Ignite;
+import org.apache.ignite.internal.table.ColumnNotFoundException;
+import org.apache.ignite.schema.ColumnType;
+import org.apache.ignite.schema.SchemaBuilders;
+import org.apache.ignite.table.KeyValueBinaryView;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Ignition interface tests.
+ */
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-14581";)
+class SchemaChangeKVViewTest extends AbstractSchemaChangeTest {
+    /**
+     * Check add a new column to table schema.
+     */
+    @Test
+    public void testDropColumn() {
+        List<Ignite> grid = startGrid();
+
+        createTable(grid);
+
+        KeyValueBinaryView kvView = grid.get(1).tables().table(TABLE).kvView();
+
+        {
+            kvView.put(kvView.tupleBuilder().set("key", 1L).build(),
+                kvView.tupleBuilder().set("valInt", 111).set("valStr", 
"str").build());
+        }
+
+        dropColumn(grid, "valStr");
+
+        {
+            // Check old row conversion.
+            final Tuple keyTuple = kvView.tupleBuilder().set("key", 
1L).build();
+
+            assertEquals(111, (Integer)kvView.get(keyTuple).value("valInt"));
+            assertThrows(ColumnNotFoundException.class, () -> 
kvView.get(keyTuple).value("valStr"));
+
+            // Check tuple of outdated schema.
+            assertThrows(ColumnNotFoundException.class, () -> kvView.put(
+                kvView.tupleBuilder().set("key", 2L).build(),
+                kvView.tupleBuilder().set("valInt", -222).set("valStr", 
"str").build())
+            );
+
+            // Check tuple of correct schema.
+            kvView.put(kvView.tupleBuilder().set("key", 2L).build(), 
kvView.tupleBuilder().set("valInt", 222).build());
+
+            final Tuple keyTuple2 = kvView.tupleBuilder().set("key", 
2L).build();
+
+            assertEquals(222, (Integer)kvView.get(keyTuple2).value("valInt"));
+            assertThrows(ColumnNotFoundException.class, () -> 
kvView.get(keyTuple2).value("valStr"));
+        }
+    }
+
+
+    /**
+     * Check drop column from table schema.
+     */
+    @Test
+    public void testAddNewColumn() {
+        List<Ignite> grid = startGrid();
+
+        createTable(grid);
+
+        KeyValueBinaryView kvView = grid.get(1).tables().table(TABLE).kvView();
+
+        {
+            kvView.put(kvView.tupleBuilder().set("key", 1L).build(), 
kvView.tupleBuilder().set("valInt", 111).build());
+
+            assertThrows(ColumnNotFoundException.class, () -> kvView.put(
+                kvView.tupleBuilder().set("key", 1L).build(),
+                kvView.tupleBuilder().set("valInt", -111).set("valStrNew", 
"str").build())
+            );
+        }
+
+        addColumn(grid, SchemaBuilders.column("valStrNew", 
ColumnType.string()).asNullable().withDefaultValue("default").build());
+
+        {
+            // Check old row conversion.
+            Tuple keyTuple = kvView.tupleBuilder().set("key", 1L).build();
+
+            assertEquals(111, (Integer)kvView.get(keyTuple).value("valInt"));
+            assertEquals("default", kvView.get(keyTuple).value("valStrNew"));
+
+            // Check tuple of new schema.
+            kvView.put(kvView.tupleBuilder().set("key", 2L).build(),
+                kvView.tupleBuilder().set("valInt", 222).set("valStrNew", 
"str").build());
+
+            Tuple keyTuple2 = kvView.tupleBuilder().set("key", 2L).build();
+
+            assertEquals(222, (Integer)kvView.get(keyTuple2).value("valInt"));
+            assertEquals("str", kvView.get(keyTuple2).value("valStrNew"));
+        }
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeTableViewTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeTableViewTest.java
new file mode 100644
index 0000000..d68ea3f
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeTableViewTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app;
+
+import java.util.List;
+import org.apache.ignite.app.Ignite;
+import org.apache.ignite.internal.table.ColumnNotFoundException;
+import org.apache.ignite.schema.ColumnType;
+import org.apache.ignite.schema.SchemaBuilders;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Ignition interface tests.
+ */
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-14581";)
+class SchemaChangeTableViewTest extends AbstractSchemaChangeTest {
+    /**
+     * Check add a new column to table schema.
+     */
+    @Test
+    public void testDropColumn() {
+        List<Ignite> grid = startGrid();
+
+        createTable(grid);
+
+        final Table tbl = grid.get(1).tables().table(TABLE);
+
+        {
+            tbl.insert(tbl.tupleBuilder().set("key", 1L).set("valInt", 
111).set("valStr", "str").build());
+        }
+
+        dropColumn(grid, "valStr");
+
+        {
+            // Check old row conversion.
+            final Tuple keyTuple = tbl.tupleBuilder().set("key", 1L).build();
+
+            assertEquals(1, (Long)tbl.get(keyTuple).value("key"));
+            assertEquals(111, (Integer)tbl.get(keyTuple).value("valInt"));
+            assertThrows(ColumnNotFoundException.class, () -> 
tbl.get(keyTuple).value("valStr"));
+
+            // Check tuple of outdated schema.
+            assertThrows(ColumnNotFoundException.class,
+                () -> tbl.insert(tbl.tupleBuilder().set("key", 
2L).set("valInt", -222).set("valStr", "str").build())
+            );
+
+            // Check tuple of correct schema.
+            tbl.insert(tbl.tupleBuilder().set("key", 2L).set("valInt", 
222).build());
+
+            final Tuple keyTuple2 = tbl.tupleBuilder().set("key", 2L).build();
+
+            assertEquals(2, (Long)tbl.get(keyTuple2).value("key"));
+            assertEquals(222, (Integer)tbl.get(keyTuple2).value("valInt"));
+            assertThrows(ColumnNotFoundException.class, () -> 
tbl.get(keyTuple2).value("valStr"));
+        }
+    }
+
+    /**
+     * Check drop column from table schema.
+     */
+    @Test
+    public void testAddNewColumn() {
+        List<Ignite> grid = startGrid();
+
+        createTable(grid);
+
+        Table tbl = grid.get(1).tables().table(TABLE);
+
+        {
+            tbl.insert(tbl.tupleBuilder().set("key", 1L).set("valInt", 
111).build());
+
+            assertThrows(ColumnNotFoundException.class,
+                () -> tbl.insert(tbl.tupleBuilder().set("key", 
1L).set("valInt", -111).set("valStrNew", "str").build())
+            );
+        }
+
+        addColumn(grid, SchemaBuilders.column("valStrNew", 
ColumnType.string()).asNullable().withDefaultValue("default").build());
+
+        // Check old row conversion.
+        Tuple keyTuple1 = tbl.tupleBuilder().set("key", 1L).build();
+
+        assertEquals(1, (Long)tbl.get(keyTuple1).value("key"));
+        assertEquals(111, (Integer)tbl.get(keyTuple1).value("valInt"));
+        assertEquals("default", tbl.get(keyTuple1).value("valStrNew"));
+
+        // Check tuple of new schema.
+        tbl.insert(tbl.tupleBuilder().set("key", 2L).set("valInt", 
222).set("valStrNew", "str").build());
+
+        Tuple keyTuple2 = tbl.tupleBuilder().set("key", 2L).build();
+
+        assertEquals(2, (Long)tbl.get(keyTuple2).value("key"));
+        assertEquals(222, (Integer)tbl.get(keyTuple2).value("valInt"));
+        assertEquals("str", tbl.get(keyTuple2).value("valStrNew"));
+    }
+}
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Column.java 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/Column.java
index 815f469..72f778d 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Column.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/Column.java
@@ -17,10 +17,10 @@
 
 package org.apache.ignite.internal.schema;
 
+import org.apache.ignite.internal.tostring.S;
+import org.jetbrains.annotations.Nullable;
 import java.io.Serializable;
 import java.util.function.Supplier;
-import org.apache.ignite.internal.tostring.S;
-import org.jetbrains.annotations.NotNull;
 
 /**
  * Column description for a type schema. Column contains a column name, a 
column type and a nullability flag.
@@ -75,7 +75,7 @@ public class Column implements Comparable<Column>, 
Serializable {
         String name,
         NativeType type,
         boolean nullable,
-        @NotNull Supplier<Object> defValSup
+        @Nullable Supplier<Object> defValSup
     ) {
         this(-1, name, type, nullable, defValSup);
     }
@@ -87,12 +87,12 @@ public class Column implements Comparable<Column>, 
Serializable {
      * @param nullable If {@code false}, null values will not be allowed for 
this column.
      * @param defValSup Default value supplier.
      */
-    Column(
+    private Column(
         int schemaIndex,
         String name,
         NativeType type,
         boolean nullable,
-        @NotNull Supplier<Object> defValSup
+        @Nullable Supplier<Object> defValSup
     ) {
         this.schemaIndex = schemaIndex;
         this.name = name;
@@ -137,9 +137,10 @@ public class Column implements Comparable<Column>, 
Serializable {
     public Object defaultValue() {
         Object val = defValSup.get();
 
-        assert nullable || val != null : "Null value is not accepted for not 
nullable column: [col=" + this + ']';
+        if (nullable || val != null)
+            return val;
 
-        return val;
+        throw new IllegalStateException("Null value is not accepted for not 
nullable column: [col=" + this + ']');
     }
 
     /** {@inheritDoc} */
@@ -173,6 +174,7 @@ public class Column implements Comparable<Column>, 
Serializable {
 
     /**
      * Validate the object by column's constraint.
+     *
      * @param val Object to validate.
      */
     public void validate(Object val) {
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Row.java 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/Row.java
index 973e8b2..82aab99 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Row.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/Row.java
@@ -33,7 +33,7 @@ import java.util.UUID;
  */
 public class Row implements BinaryRow {
     /** Schema descriptor. */
-    private final SchemaDescriptor schema;
+    protected final SchemaDescriptor schema;
 
     /** Binary row. */
     private final BinaryRow row;
@@ -53,8 +53,6 @@ public class Row implements BinaryRow {
      * @param row Binary row representation.
      */
     public Row(SchemaDescriptor schema, BinaryRow row) {
-        assert row.schemaVersion() == schema.version();
-
         this.row = row;
         this.schema = schema;
     }
@@ -83,7 +81,10 @@ public class Row implements BinaryRow {
     public byte byteValue(int col) throws InvalidTypeException {
         long off = findColumn(col, NativeTypeSpec.BYTE);
 
-        return off < 0 ? 0 : readByte(offset(off));
+        if (off < 0)
+            return off == -1 ? 0 : (byte)schema.column(col).defaultValue();
+
+        return readByte(offset(off));
     }
 
     /**
@@ -96,7 +97,10 @@ public class Row implements BinaryRow {
     public Byte byteValueBoxed(int col) throws InvalidTypeException {
         long off = findColumn(col, NativeTypeSpec.BYTE);
 
-        return off < 0 ? null : readByte(offset(off));
+        if (off < 0)
+            return off == -1 ? null : (Byte)schema.column(col).defaultValue();
+
+        return readByte(offset(off));
     }
 
     /**
@@ -109,7 +113,10 @@ public class Row implements BinaryRow {
     public short shortValue(int col) throws InvalidTypeException {
         long off = findColumn(col, NativeTypeSpec.SHORT);
 
-        return off < 0 ? 0 : readShort(offset(off));
+        if (off < 0)
+            return off == -1 ? 0 : (short)schema.column(col).defaultValue();
+
+        return readShort(offset(off));
     }
 
     /**
@@ -122,7 +129,10 @@ public class Row implements BinaryRow {
     public Short shortValueBoxed(int col) throws InvalidTypeException {
         long off = findColumn(col, NativeTypeSpec.SHORT);
 
-        return off < 0 ? null : readShort(offset(off));
+        if (off < 0)
+            return off == -1 ? null : (Short)schema.column(col).defaultValue();
+
+        return readShort(offset(off));
     }
 
     /**
@@ -135,7 +145,10 @@ public class Row implements BinaryRow {
     public int intValue(int col) throws InvalidTypeException {
         long off = findColumn(col, NativeTypeSpec.INTEGER);
 
-        return off < 0 ? 0 : readInteger(offset(off));
+        if (off < 0)
+            return off == -1 ? 0 : (int)schema.column(col).defaultValue();
+
+        return readInteger(offset(off));
     }
 
     /**
@@ -148,7 +161,10 @@ public class Row implements BinaryRow {
     public Integer intValueBoxed(int col) throws InvalidTypeException {
         long off = findColumn(col, NativeTypeSpec.INTEGER);
 
-        return off < 0 ? null : readInteger(offset(off));
+        if (off < 0)
+            return off == -1 ? null : 
(Integer)schema.column(col).defaultValue();
+
+        return readInteger(offset(off));
     }
 
     /**
@@ -161,7 +177,10 @@ public class Row implements BinaryRow {
     public long longValue(int col) throws InvalidTypeException {
         long off = findColumn(col, NativeTypeSpec.LONG);
 
-        return off < 0 ? 0 : readLong(offset(off));
+        if (off < 0)
+            return off == -1 ? 0L : (long)schema.column(col).defaultValue();
+
+        return readLong(offset(off));
     }
 
     /**
@@ -174,7 +193,10 @@ public class Row implements BinaryRow {
     public Long longValueBoxed(int col) throws InvalidTypeException {
         long off = findColumn(col, NativeTypeSpec.LONG);
 
-        return off < 0 ? null : readLong(offset(off));
+        if (off < 0)
+            return off == -1 ? null : (Long)schema.column(col).defaultValue();
+
+        return readLong(offset(off));
     }
 
     /**
@@ -187,7 +209,10 @@ public class Row implements BinaryRow {
     public float floatValue(int col) throws InvalidTypeException {
         long off = findColumn(col, NativeTypeSpec.FLOAT);
 
-        return off < 0 ? 0.f : readFloat(offset(off));
+        if (off < 0)
+            return off == -1 ? 0.f : (float)schema.column(col).defaultValue();
+
+        return readFloat(offset(off));
     }
 
     /**
@@ -200,7 +225,10 @@ public class Row implements BinaryRow {
     public Float floatValueBoxed(int col) throws InvalidTypeException {
         long off = findColumn(col, NativeTypeSpec.FLOAT);
 
-        return off < 0 ? null : readFloat(offset(off));
+        if (off < 0)
+            return off == -1 ? null : (Float)schema.column(col).defaultValue();
+
+        return readFloat(offset(off));
     }
 
     /**
@@ -213,7 +241,10 @@ public class Row implements BinaryRow {
     public double doubleValue(int col) throws InvalidTypeException {
         long off = findColumn(col, NativeTypeSpec.DOUBLE);
 
-        return off < 0 ? 0.d : readDouble(offset(off));
+        if (off < 0)
+            return off == -1 ? 0.d : (double)schema.column(col).defaultValue();
+
+        return readDouble(offset(off));
     }
 
     /**
@@ -226,7 +257,10 @@ public class Row implements BinaryRow {
     public Double doubleValueBoxed(int col) throws InvalidTypeException {
         long off = findColumn(col, NativeTypeSpec.DOUBLE);
 
-        return off < 0 ? null : readDouble(offset(off));
+        if (off < 0)
+            return off == -1 ? null : 
(Double)schema.column(col).defaultValue();
+
+        return readDouble(offset(off));
     }
 
     /**
@@ -252,7 +286,7 @@ public class Row implements BinaryRow {
         long offLen = findColumn(col, NativeTypeSpec.STRING);
 
         if (offLen < 0)
-            return null;
+            return offLen == -1 ? null : 
(String)rowSchema().column(col).defaultValue();
 
         int off = offset(offLen);
         int len = length(offLen);
@@ -271,7 +305,7 @@ public class Row implements BinaryRow {
         long offLen = findColumn(col, NativeTypeSpec.BYTES);
 
         if (offLen < 0)
-            return null;
+            return offLen == -1 ? null : 
(byte[])schema.column(col).defaultValue();
 
         int off = offset(offLen);
         int len = length(offLen);
@@ -287,12 +321,12 @@ public class Row implements BinaryRow {
      * @throws InvalidTypeException If actual column type does not match the 
requested column type.
      */
     public UUID uuidValue(int col) throws InvalidTypeException {
-        long found = findColumn(col, NativeTypeSpec.UUID);
+        long offLen = findColumn(col, NativeTypeSpec.UUID);
 
-        if (found < 0)
-            return null;
+        if (offLen < 0)
+            return offLen == -1 ? null : 
(UUID)schema.column(col).defaultValue();
 
-        int off = offset(found);
+        int off = offset(offLen);
 
         long lsb = readLong(off);
         long msb = readLong(off + 8);
@@ -311,7 +345,7 @@ public class Row implements BinaryRow {
         long offLen = findColumn(col, NativeTypeSpec.BITMASK);
 
         if (offLen < 0)
-            return null;
+            return offLen == -1 ? null : 
(BitSet)schema.column(col).defaultValue();
 
         int off = offset(offLen);
         int len = columnLength(col);
@@ -440,7 +474,8 @@ public class Row implements BinaryRow {
      * @param hasNullMap Has null map flag.
      * @return Encoded offset (from the row start) and length of the column 
with the given index.
      */
-    private long varlenColumnOffsetAndLength(Columns cols, int baseOff, int 
idx, boolean hasVarTbl, boolean hasNullMap) {
+    private long varlenColumnOffsetAndLength(Columns cols, int baseOff, int 
idx, boolean hasVarTbl,
+        boolean hasNullMap) {
         int vartableOff = baseOff + CHUNK_LEN_FIELD_SIZE;
 
         int numNullsBefore = 0;
@@ -560,7 +595,7 @@ public class Row implements BinaryRow {
 
     /** {@inheritDoc} */
     @Override public int schemaVersion() {
-        return row.schemaVersion();
+        return schema.version();
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index efc101b..62a35cb 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -53,7 +53,7 @@ import org.jetbrains.annotations.NotNull;
 
 /**
  * Schema Manager.
- *
+ * <p>
  * Schemas MUST be registered in a version ascending order incrementing by 
{@code 1} with NO gaps,
  * otherwise an exception will be thrown. The version numbering starts from 
the {@code 1}.
  * <p>
@@ -113,7 +113,7 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
                     if (verPos == -1) {
                         final UUID tblId = UUID.fromString(keyTail);
 
-                        SchemaRegistry reg = schemaRegistryForTable(tblId);
+                        SchemaRegistryImpl reg = schemaRegistryForTable(tblId);
 
                         assert reg != null : "Table schema was not initialized 
or table has been dropped: " + tblId;
 
@@ -124,6 +124,13 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
 
                             onEvent(SchemaEvent.DROPPED, new 
SchemaEventParameters(tblId, null), null);
                         }
+                        else {
+                            int v = 
(int)ByteUtils.bytesToLong(evt.newEntry().value(),0);
+
+                            assert reg.lastSchemaVersion() == v;
+
+                            onEvent(SchemaEvent.CHANGED, new 
SchemaEventParameters(tblId, reg), null);
+                        }
 
                         return true; // Ignore last table schema version.
                     }
@@ -191,6 +198,42 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
     }
 
     /**
+     * Creates schema registry for the table with existed schema or
+     * registers initial schema from configuration.
+     *
+     * @param tblId Table id.
+     * @param tblName Table name.
+     * @return Operation future.
+     */
+    public CompletableFuture<Boolean> updateSchemaForTable(final UUID tblId, 
String tblName) {
+        return vaultMgr.get(ByteArray.fromString(INTERNAL_PREFIX + tblId)).
+            thenCompose(entry -> {
+                TableConfiguration tblConfig = 
configurationMgr.configurationRegistry().
+                    
getConfiguration(TablesConfiguration.KEY).tables().get(tblName);
+
+                assert !entry.empty();
+
+                final int oldVer = (int)ByteUtils.bytesToLong(entry.value(), 
0);
+                final int newVer = oldVer + 1;
+
+                final ByteArray lastVerKey = new ByteArray(INTERNAL_PREFIX + 
tblId);
+                final ByteArray schemaKey = new ByteArray(INTERNAL_PREFIX + 
tblId + INTERNAL_VER_SUFFIX + newVer);
+
+                SchemaTable schemaTable = 
SchemaConfigurationConverter.convert(tblConfig);
+                final SchemaDescriptor desc = 
SchemaDescriptorConverter.convert(tblId, newVer, schemaTable);
+
+                return metaStorageMgr.invoke(Conditions.notExists(schemaKey),
+                    Operations.put(schemaKey, ByteUtils.toBytes(desc)),
+                    Operations.noop())
+                    //TODO: IGNITE-14679 Serialize schema.
+                    .thenCompose(res -> metaStorageMgr.invoke(
+                        
Conditions.value(lastVerKey).eq(ByteUtils.longToBytes(oldVer)),
+                        Operations.put(lastVerKey, 
ByteUtils.longToBytes(newVer)),
+                        Operations.noop()));
+            });
+    }
+
+    /**
      * Return table schema of certain version from history.
      *
      * @param tblId Table id.
@@ -231,8 +274,8 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
      * @param tableId Table id.
      * @return Schema registry for the table.
      */
-    private SchemaRegistry schemaRegistryForTable(UUID tableId) {
-        final SchemaRegistry reg = schemaRegs.get(tableId);
+    private SchemaRegistryImpl schemaRegistryForTable(UUID tableId) {
+        final SchemaRegistryImpl reg = schemaRegs.get(tableId);
 
         if (reg == null)
             throw new SchemaRegistryException("No schema was ever registered 
for the table: " + tableId);
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
index 777adf1..ea1ad4e 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
@@ -25,14 +25,29 @@ import org.jetbrains.annotations.NotNull;
  */
 public interface SchemaRegistry {
     /**
-     * @return Current schema.
+     * Gets schema descriptor for the latest version if initialized.
+     *
+     * @return Schema descriptor if initialized, {@code null} otherwise.
      */
     SchemaDescriptor schema();
 
     /**
-     * @param ver Schema version.
-     * @return Schema of given version.
-     * @throws SchemaRegistryException If schema was not found.
+     * @return Last registereg schema version.
+     */
+    public int lastSchemaVersion();
+
+    /**
+     * Gets schema descriptor for given version.
+     *
+     * @param ver Schema version to get descriptor for.
+     * @return Schema descriptor of given version.
+     * @throws SchemaRegistryException If no schema found for given version.
      */
     @NotNull SchemaDescriptor schema(int ver) throws SchemaRegistryException;
+
+    /**
+     * @param row Binary row.
+     * @return Schema-aware row.
+     */
+    Row resolve(BinaryRow row);
 }
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaDescriptorConverter.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaDescriptorConverter.java
index 29f288f..400d0c3 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaDescriptorConverter.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaDescriptorConverter.java
@@ -17,19 +17,19 @@
 
 package org.apache.ignite.internal.schema.configuration;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Supplier;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.InvalidTypeException;
 import org.apache.ignite.internal.schema.NativeType;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
-
 import org.apache.ignite.schema.ColumnType;
 import org.apache.ignite.schema.SchemaTable;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
 import static org.apache.ignite.internal.schema.NativeTypes.BYTE;
 import static org.apache.ignite.internal.schema.NativeTypes.DOUBLE;
 import static org.apache.ignite.internal.schema.NativeTypes.FLOAT;
@@ -86,7 +86,7 @@ public class SchemaDescriptorConverter {
                 return UUID;
 
             case BITMASK:
-                return NativeTypes.bitmaskOf(((ColumnType.VarLenColumnType) 
colType).length());
+                return 
NativeTypes.bitmaskOf(((ColumnType.VarLenColumnType)colType).length());
 
             case STRING:
                 int strLen = ((ColumnType.VarLenColumnType)colType).length();
@@ -116,7 +116,7 @@ public class SchemaDescriptorConverter {
      * @return Internal Column.
      */
     private static Column convert(org.apache.ignite.schema.Column colCfg) {
-        return new Column(colCfg.name(), convert(colCfg.type()), 
colCfg.nullable());
+        return new Column(colCfg.name(), convert(colCfg.type()), 
colCfg.nullable(), new ConstantSupplier((Serializable)colCfg.defaultValue()));
     }
 
     /**
@@ -132,7 +132,7 @@ public class SchemaDescriptorConverter {
 
         Column[] keyCols = new Column[keyColsCfg.size()];
 
-        for (int i = 0;i < keyCols.length;i++)
+        for (int i = 0; i < keyCols.length; i++)
             keyCols[i] = convert(keyColsCfg.get(i));
 
         String[] affCols = 
tblCfg.affinityColumns().stream().map(org.apache.ignite.schema.Column::name)
@@ -142,9 +142,29 @@ public class SchemaDescriptorConverter {
 
         Column[] valCols = new Column[valColsCfg.size()];
 
-        for (int i = 0;i < valCols.length;i++)
+        for (int i = 0; i < valCols.length; i++)
             valCols[i] = convert(valColsCfg.get(i));
 
         return new SchemaDescriptor(tblId, schemaVer, keyCols, affCols, 
valCols);
     }
+
+    /**
+     * Constant value supplier.
+     */
+    private static class ConstantSupplier implements Supplier<Object>, 
Serializable {
+        /** Value. */
+        private final Serializable val;
+
+        /**
+         * @param val Value.
+         */
+        ConstantSupplier(Serializable val) {
+            this.val = val;
+        }
+
+        /** {@inheritDoc */
+        @Override public Object get() {
+            return val;
+        }
+    }
 }
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java
index d81e0d2..6d27985 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java
@@ -26,6 +26,9 @@ public enum SchemaEvent implements Event {
     /** This event is fired when a schema was initialized. */
     INITIALIZED,
 
+    /** This event is fired when a schema was changed. */
+    CHANGED,
+
     /** This event is fired when a schema was dropped. */
     DROPPED
 }
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/ColumnMapping.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/ColumnMapping.java
new file mode 100644
index 0000000..dbb95a0
--- /dev/null
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/ColumnMapping.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.registry;
+
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+
+/**
+ * Column mapping.
+ */
+class ColumnMapping {
+    /** Mapping. */
+    private final int[] mapping;
+
+    /** First mapped column index. */
+    private final int firstColIdx;
+
+    /**
+     * @param schema Source schema descriptor.
+     */
+    ColumnMapping(SchemaDescriptor schema) {
+        firstColIdx = schema.keyColumns().length();
+        mapping = new int[schema.valueColumns().length()];
+    }
+
+    /**
+     * Add column mapping.
+     *
+     * @param from Column index in source schema.
+     * @param to Column index in schema.
+     */
+    void add(int from, int to) {
+        assert from >= firstColIdx && from < firstColIdx + mapping.length;
+
+        mapping[from - firstColIdx] = to;
+    }
+
+    /**
+     * Gets mapped column idx.
+     *
+     * @param idx Column index in source.
+     * @return Column index in targer schema.
+     */
+    int map(int idx) {
+        if (idx < firstColIdx)
+            return idx;
+
+        return mapping[idx - firstColIdx];
+    }
+}
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
index f9d9e64..a2fac90 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
@@ -17,8 +17,14 @@
 
 package org.apache.ignite.internal.schema.registry;
 
+import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.Columns;
+import org.apache.ignite.internal.schema.InvalidTypeException;
+import org.apache.ignite.internal.schema.Row;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.jetbrains.annotations.Nullable;
@@ -60,13 +66,7 @@ public class SchemaRegistryImpl implements SchemaRegistry {
         this.history = history;
     }
 
-    /**
-     * Gets schema descriptor for given version.
-     *
-     * @param ver Schema version to get descriptor for.
-     * @return Schema descriptor.
-     * @throws SchemaRegistryException If no schema found for given version.
-     */
+    /** {@inheritDoc} */
     @Override public SchemaDescriptor schema(int ver) {
         SchemaDescriptor desc = schemaCache.get(ver);
 
@@ -87,26 +87,65 @@ public class SchemaRegistryImpl implements SchemaRegistry {
             throw new SchemaRegistryException("Failed to find schema: ver=" + 
ver);
     }
 
-    /**
-     * Gets schema descriptor for the latest version if initialized.
-     *
-     * @return Schema descriptor if initialized, {@code null} otherwise.
-     * @throws SchemaRegistryException If failed.
-     */
+    /** {@inheritDoc} */
     @Override public @Nullable SchemaDescriptor schema() {
         final int lastVer0 = lastVer;
 
         if (lastVer0 == INITIAL_SCHEMA_VERSION)
             return null;
 
-       return schema(lastVer0);
+        return schema(lastVer0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int lastSchemaVersion() {
+        return lastVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Row resolve(BinaryRow row) {
+        final SchemaDescriptor rowSchema = schema(row.schemaVersion());
+        final SchemaDescriptor curSchema = schema();
+
+        if (curSchema.version() == rowSchema.version())
+            return new Row(rowSchema, row);
+
+        return new UpgradingRowAdapter(curSchema, row, columnMapper(curSchema, 
rowSchema));
     }
 
     /**
-     * @return Last known schema version.
+     * Create column mapping for schemas.
+     *
+     * @param src Source schema of newer version.
+     * @param dst Target schema of older version.
+     * @return Column mapping.
      */
-    public int lastSchemaVersion() {
-        return lastVer;
+    private ColumnMapping columnMapper(SchemaDescriptor src, SchemaDescriptor 
dst) {
+        assert src.version() > dst.version();
+        assert src.version() == dst.version() + 1; // TODO: IGNITE-14863 
implement merged mapper for arbitraty schema versions.
+
+        final Columns srcCols = src.valueColumns();
+        final Columns dstCols = dst.valueColumns();
+
+        final ColumnMapping mapping = new ColumnMapping(src);
+
+        for (int i = 0; i < srcCols.columns().length; i++) {
+            final Column col = srcCols.column(i);
+
+            try {
+                final int idx = dstCols.columnIndex(col.name());
+
+                if (!col.equals(dstCols.column(idx)))
+                    throw new InvalidTypeException("Column of incompatible 
type: [colIdx=" + col.schemaIndex() + ", schemaVer=" + src.version());
+
+                mapping.add(col.schemaIndex(), dst.keyColumns().length() + 
idx);
+            }
+            catch (NoSuchElementException ex) {
+                mapping.add(col.schemaIndex(), -1);
+            }
+        }
+
+        return mapping;
     }
 
     /**
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
new file mode 100644
index 0000000..8c9a4fc
--- /dev/null
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.registry;
+
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.InvalidTypeException;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+
+/**
+ * Adapter for row of older schema.
+ */
+class UpgradingRowAdapter extends Row {
+    /** Column mapper. */
+    private final ColumnMapping mapping;
+
+    /**
+     * @param schema Schema descriptor of new version.
+     * @param row Row.
+     * @param mapping Column mapping.
+     */
+    UpgradingRowAdapter(SchemaDescriptor schema, BinaryRow row, ColumnMapping 
mapping) {
+        super(schema, row);
+
+        this.mapping = mapping;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long findColumn(int colIdx, NativeTypeSpec type) 
throws InvalidTypeException {
+        if (schema.isKeyColumn(colIdx))
+            return super.findColumn(colIdx, type);
+
+        int mapIdx = mapping.map(colIdx);
+
+        return (mapIdx < 0) ? Long.MIN_VALUE : super.findColumn(mapIdx, type);
+    }
+}
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
index e3dd181..515889c 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.affinity.RendezvousAffinityFunction;
+import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
@@ -253,9 +254,17 @@ public class ITDistributedTableTest {
                 return SCHEMA;
             }
 
+            @Override public int lastSchemaVersion() {
+                return SCHEMA.version();
+            }
+
             @Override public SchemaDescriptor schema(int ver) {
                 return SCHEMA;
             }
+
+            @Override public Row resolve(BinaryRow row) {
+                return new Row(SCHEMA, row);
+            }
         });
 
         partitionedTableView(tbl, PARTS * 10);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
index be1c3ac..2ba0bbe 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
@@ -25,7 +25,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Row;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.table.InvokeProcessor;
@@ -307,9 +306,9 @@ public class KVBinaryViewImpl extends AbstractTableView 
implements KeyValueBinar
         if (row == null)
             return null;
 
-        final SchemaDescriptor schema = schemaReg.schema(row.schemaVersion());
+        final Row wrapped = schemaReg.resolve(row);
 
-        return new TableRow(schema, new Row(schema, row));
+        return new TableRow(wrapped.rowSchema(), wrapped);
     }
 
     /**
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index 99c8c8d..d4bdc23 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -27,7 +27,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Row;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
 import org.apache.ignite.table.InvokeProcessor;
@@ -395,9 +394,9 @@ public class TableImpl extends AbstractTableView implements 
Table {
         if (row == null)
             return null;
 
-        final SchemaDescriptor schema = schemaReg.schema(row.schemaVersion());
+        final Row wrapped = schemaReg.resolve(row);
 
-        return new TableRow(schema, new Row(schema, row));
+        return new TableRow(wrapped.rowSchema(), wrapped);
     }
 
     /**
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableRow.java 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableRow.java
index 74ed57a..8814951 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableRow.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableRow.java
@@ -27,7 +27,7 @@ import org.jetbrains.annotations.Nullable;
 
 /**
  * Row to Tuple adapter.
- *
+ * <p>
  * Provides methods to access columns values by column names.
  */
 public class TableRow extends RowChunkAdapter {
@@ -57,7 +57,7 @@ public class TableRow extends RowChunkAdapter {
         final Column col = schema.column(colName);
 
         if (col == null)
-            throw new IllegalArgumentException("Invalid column name: 
columnName=" + colName + ", schemaVersion=" + schema.version());
+            throw new ColumnNotFoundException("Invalid column name: 
columnName=" + colName + ", schemaVersion=" + schema.version());
 
         return col;
     }
@@ -81,7 +81,7 @@ public class TableRow extends RowChunkAdapter {
         return row;
     }
 
-    /** */
+    /** {@inheritDoc} */
     @Override public boolean contains(String colName) {
         return schema.column(colName) != null;
     }
@@ -100,12 +100,12 @@ public class TableRow extends RowChunkAdapter {
             final Column col = schema.column(colName);
 
             if (col == null || !schema.isKeyColumn(col.schemaIndex()))
-                throw new IllegalArgumentException("Invalid key column name: 
columnName=" + colName + ", schemaVersion=" + schema.version());
+                throw new ColumnNotFoundException("Invalid key column name: 
columnName=" + colName + ", schemaVersion=" + schema.version());
 
             return col;
         }
 
-        /** */
+        /** {@inheritDoc} */
         @Override public boolean contains(String colName) {
             return schema.column(colName) != null;
         }
@@ -125,12 +125,12 @@ public class TableRow extends RowChunkAdapter {
             final Column col = schema.column(colName);
 
             if (col == null || schema.isKeyColumn(col.schemaIndex()))
-                throw new IllegalArgumentException("Invalid key column name: 
columnName=" + colName + ", schemaVersion=" + schema.version());
+                throw new ColumnNotFoundException("Invalid value column name: 
columnName=" + colName + ", schemaVersion=" + schema.version());
 
             return col;
         }
 
-        /** */
+        /** {@inheritDoc} */
         @Override public boolean contains(String colName) {
             return schema.column(colName) != null;
         }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 4b14ad9..a535c11 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -31,9 +31,11 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 import org.apache.ignite.configuration.schemas.table.TableChange;
 import org.apache.ignite.configuration.schemas.table.TableView;
 import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.configuration.NamedListView;
 import org.apache.ignite.internal.affinity.AffinityManager;
 import org.apache.ignite.internal.affinity.event.AffinityEvent;
 import org.apache.ignite.internal.affinity.event.AffinityEventParameters;
@@ -199,135 +201,240 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         //TODO: IGNITE-14652 Change a metastorage update in listeners to 
multi-invoke
         
configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().listen(ctx
 -> {
             Set<String> tablesToStart = (ctx.newValue() == null || 
ctx.newValue().namedListKeys() == null) ?
-                Collections.emptySet() : new 
HashSet<>(ctx.newValue().namedListKeys());
+                Collections.emptySet() :
+                ctx.newValue().namedListKeys().stream().filter(t -> 
!ctx.oldValue().namedListKeys().contains(t)).collect(Collectors.toSet());
 
-            tablesToStart.removeAll(ctx.oldValue().namedListKeys());
-
-            long revision = ctx.storageRevision();
+            Set<String> tablesToStop = (ctx.oldValue() == null || 
ctx.oldValue().namedListKeys() == null) ?
+                Collections.emptySet() :
+                ctx.oldValue().namedListKeys().stream().filter(t -> 
!ctx.newValue().namedListKeys().contains(t)).collect(Collectors.toSet());
 
             List<CompletableFuture<Boolean>> futs = new ArrayList<>();
 
-            boolean hasMetastorageLocally = 
metaStorageMgr.hasMetastorageLocally(configurationMgr);
+            final Set<String> schemaChanged =
+                (ctx.oldValue() != null && ctx.oldValue().namedListKeys() != 
null && ctx.newValue() != null && ctx.newValue().namedListKeys() != null) ?
+                    ctx.oldValue().namedListKeys().stream()
+                        .filter(tblName -> 
ctx.newValue().namedListKeys().contains(tblName)) // Filter changed tables.
+                        .filter(tblName -> {
+                            final TableView newTbl = 
ctx.newValue().get(tblName);
+                            final TableView oldTbl = 
ctx.oldValue().get(tblName);
 
-            for (String tblName : tablesToStart) {
-                TableView tableView = ctx.newValue().get(tblName);
+                            assert newTbl.columns().namedListKeys() != null && 
oldTbl.columns().namedListKeys() != null;
 
-                UUID tblId = new UUID(revision, 0L);
+                            return 
newTbl.columns().namedListKeys().stream().anyMatch(c -> 
!oldTbl.columns().namedListKeys().contains(c)) ||
+                                
oldTbl.columns().namedListKeys().stream().anyMatch(c -> 
!newTbl.columns().namedListKeys().contains(c));
+                        }).collect(Collectors.toSet()) :
+                    Collections.emptySet();
 
-                if (hasMetastorageLocally) {
-                    var key = new ByteArray(INTERNAL_PREFIX + tblId);
-                    futs.add(metaStorageMgr.invoke(
-                        Conditions.notExists(key),
-                        Operations.put(key, 
tableView.name().getBytes(StandardCharsets.UTF_8)),
-                        Operations.noop())
-                        .thenCompose(res -> 
schemaMgr.initSchemaForTable(tblId, tableView.name()))
-                        .thenCompose(res -> affMgr.calculateAssignments(tblId, 
tableView.name())));
-                }
+            if (!tablesToStart.isEmpty())
+                futs.addAll(startTables(tablesToStart, ctx.storageRevision(), 
ctx.newValue()));
 
-                final CompletableFuture<AffinityEventParameters> 
affinityReadyFut = new CompletableFuture<>();
-                final CompletableFuture<SchemaEventParameters> schemaReadyFut 
= new CompletableFuture<>();
+            if (!schemaChanged.isEmpty())
+                futs.addAll(changeSchema(schemaChanged));
 
-                CompletableFuture.allOf(affinityReadyFut, schemaReadyFut)
-                    .exceptionally(e -> {
-                        LOG.error("Failed to create a new table [name=" + 
tblName + ", id=" + tblId + ']', e);
+            if (!tablesToStop.isEmpty())
+                futs.addAll(stopTables(tablesToStop));
 
-                        onEvent(TableEvent.CREATE, new 
TableEventParameters(tblId, tblName), e);
+            return 
CompletableFuture.allOf(futs.toArray(CompletableFuture[]::new));
+        });
+    }
 
-                        return null;
-                    })
-                    .thenRun(() -> createTableLocally(
-                        tblName,
-                        tblId,
-                        affinityReadyFut.join().assignment(),
-                        schemaReadyFut.join().schemaRegistry()
-                    ));
+    /**
+     * Start tables routine.
+     *
+     * @param tbls Tables to start.
+     * @param rev Metastore revision.
+     * @param cfgs Table configurations.
+     * @return Table creation futures.
+     */
+    private List<CompletableFuture<Boolean>> startTables(Set<String> tbls, 
long rev, NamedListView<TableView> cfgs) {
+        boolean hasMetastorageLocally = 
metaStorageMgr.hasMetastorageLocally(configurationMgr);
+
+        List<CompletableFuture<Boolean>> futs = new ArrayList<>();
 
-                affMgr.listen(AffinityEvent.CALCULATED, new 
EventListener<AffinityEventParameters>() {
-                    @Override public boolean notify(@NotNull 
AffinityEventParameters parameters, @Nullable Throwable e) {
-                        if (!tblId.equals(parameters.tableId()))
-                            return false;
+        for (String tblName : tbls) {
+            TableView tableView = cfgs.get(tblName);
+
+            UUID tblId = new UUID(rev, 0L);
+
+            if (hasMetastorageLocally) {
+                var key = new ByteArray(INTERNAL_PREFIX + tblId);
+                futs.add(metaStorageMgr.invoke(
+                    Conditions.notExists(key),
+                    Operations.put(key, 
tableView.name().getBytes(StandardCharsets.UTF_8)),
+                    Operations.noop())
+                    .thenCompose(res -> schemaMgr.initSchemaForTable(tblId, 
tableView.name()))
+                    .thenCompose(res -> affMgr.calculateAssignments(tblId, 
tableView.name())));
+            }
 
-                        if (e == null)
-                            affinityReadyFut.complete(parameters);
-                        else
-                            affinityReadyFut.completeExceptionally(e);
+            final CompletableFuture<AffinityEventParameters> affinityReadyFut 
= new CompletableFuture<>();
+            final CompletableFuture<SchemaEventParameters> schemaReadyFut = 
new CompletableFuture<>();
 
-                        return true;
-                    }
+            CompletableFuture.allOf(affinityReadyFut, schemaReadyFut)
+                .exceptionally(e -> {
+                    LOG.error("Failed to create a new table [name=" + tblName 
+ ", id=" + tblId + ']', e);
 
-                    @Override public void remove(@NotNull Throwable e) {
+                    onEvent(TableEvent.CREATE, new TableEventParameters(tblId, 
tblName), e);
+
+                    return null;
+                })
+                .thenRun(() -> createTableLocally(
+                    tblName,
+                    tblId,
+                    affinityReadyFut.join().assignment(),
+                    schemaReadyFut.join().schemaRegistry()
+                ));
+
+            affMgr.listen(AffinityEvent.CALCULATED, new EventListener<>() {
+                @Override public boolean notify(@NotNull 
AffinityEventParameters parameters, @Nullable Throwable e) {
+                    if (!tblId.equals(parameters.tableId()))
+                        return false;
+
+                    if (e == null)
+                        affinityReadyFut.complete(parameters);
+                    else
                         affinityReadyFut.completeExceptionally(e);
-                    }
-                });
 
-                schemaMgr.listen(SchemaEvent.INITIALIZED, new 
EventListener<SchemaEventParameters>() {
-                    @Override public boolean notify(@NotNull 
SchemaEventParameters parameters, @Nullable Throwable e) {
-                        if (!tblId.equals(parameters.tableId()))
-                            return false;
+                    return true;
+                }
 
-                        if (e == null)
-                            schemaReadyFut.complete(parameters);
-                        else
-                            schemaReadyFut.completeExceptionally(e);
+                @Override public void remove(@NotNull Throwable e) {
+                    affinityReadyFut.completeExceptionally(e);
+                }
+            });
 
-                        return true;
-                    }
+            schemaMgr.listen(SchemaEvent.INITIALIZED, new EventListener<>() {
+                @Override public boolean notify(@NotNull SchemaEventParameters 
parameters, @Nullable Throwable e) {
+                    if (!tblId.equals(parameters.tableId()) && 
parameters.schemaRegistry().lastSchemaVersion() >= 1)
+                        return false;
 
-                    @Override public void remove(@NotNull Throwable e) {
+                    if (e == null)
+                        schemaReadyFut.complete(parameters);
+                    else
                         schemaReadyFut.completeExceptionally(e);
-                    }
-                });
-            }
 
-            Set<String> tablesToStop = (ctx.oldValue() == null || 
ctx.oldValue().namedListKeys() == null) ?
-                Collections.emptySet() : new 
HashSet<>(ctx.oldValue().namedListKeys());
+                    return true;
+                }
 
-            tablesToStop.removeAll(ctx.newValue().namedListKeys());
+                @Override public void remove(@NotNull Throwable e) {
+                    schemaReadyFut.completeExceptionally(e);
+                }
+            });
+        }
 
-            for (String tblName : tablesToStop) {
-                TableImpl t = tables.get(tblName);
+        return futs;
+    }
+
+    /**
+     * Drop tables routine.
+     *
+     * @param tbls Tables to drop.
+     * @return Table drop futures.
+     */
+    private List<CompletableFuture<Boolean>> stopTables(Set<String> tbls) {
+        boolean hasMetastorageLocally = 
metaStorageMgr.hasMetastorageLocally(configurationMgr);
 
-                UUID tblId = t.tableId();
+        List<CompletableFuture<Boolean>> futs = new ArrayList<>();
 
-                if (hasMetastorageLocally) {
-                    var key = new ByteArray(INTERNAL_PREFIX + tblId);
+        for (String tblName : tbls) {
+            TableImpl t = tables.get(tblName);
 
-                    futs.add(affMgr.removeAssignment(tblId)
-                        .thenCompose(res -> schemaMgr.unregisterSchemas(tblId))
-                        .thenCompose(res ->
-                            metaStorageMgr.invoke(Conditions.exists(key),
-                                Operations.remove(key),
-                                Operations.noop())));
-                }
+            UUID tblId = t.tableId();
 
-                affMgr.listen(AffinityEvent.REMOVED, new 
EventListener<AffinityEventParameters>() {
-                    @Override public boolean notify(@NotNull 
AffinityEventParameters parameters, @Nullable Throwable e) {
-                        if (!tblId.equals(parameters.tableId()))
-                            return false;
+            if (hasMetastorageLocally) {
+                var key = new ByteArray(INTERNAL_PREFIX + tblId);
 
-                        if (e == null)
-                            dropTableLocally(tblName, tblId, 
parameters.assignment());
-                        else
-                            onEvent(TableEvent.DROP, new 
TableEventParameters(tblId, tblName), e);
+                futs.add(affMgr.removeAssignment(tblId)
+                    .thenCompose(res -> schemaMgr.unregisterSchemas(tblId))
+                    .thenCompose(res ->
+                        metaStorageMgr.invoke(Conditions.exists(key),
+                            Operations.remove(key),
+                            Operations.noop())));
+            }
 
-                        return true;
-                    }
+            affMgr.listen(AffinityEvent.REMOVED, new EventListener<>() {
+                @Override public boolean notify(@NotNull 
AffinityEventParameters parameters, @Nullable Throwable e) {
+                    if (!tblId.equals(parameters.tableId()))
+                        return false;
 
-                    @Override public void remove(@NotNull Throwable e) {
+                    if (e == null)
+                        dropTableLocally(tblName, tblId, 
parameters.assignment());
+                    else
                         onEvent(TableEvent.DROP, new 
TableEventParameters(tblId, tblName), e);
-                    }
-                });
-            }
 
-            return 
CompletableFuture.allOf(futs.toArray(CompletableFuture[]::new));
-        });
+                    return true;
+                }
+
+                @Override public void remove(@NotNull Throwable e) {
+                    onEvent(TableEvent.DROP, new TableEventParameters(tblId, 
tblName), e);
+                }
+            });
+        }
+
+        return futs;
+    }
+
+    /**
+     * Start tables routine.
+     *
+     * @param tbls Tables to start.
+     * @return Table creation futures.
+     */
+    private List<CompletableFuture<Boolean>> changeSchema(Set<String> tbls) {
+        boolean hasMetastorageLocally = 
metaStorageMgr.hasMetastorageLocally(configurationMgr);
+
+        List<CompletableFuture<Boolean>> futs = new ArrayList<>();
+
+        for (String tblName : tbls) {
+            TableImpl tbl = tables.get(tblName);
+
+            UUID tblId = tbl.tableId();
+
+            final int ver = tbl.schemaView().lastSchemaVersion() + 1;
+
+            if (hasMetastorageLocally)
+                futs.add(schemaMgr.updateSchemaForTable(tblId, tblName));
+
+            final CompletableFuture<SchemaEventParameters> schemaReadyFut = 
new CompletableFuture<>();
+
+            CompletableFuture.allOf(schemaReadyFut)
+                .exceptionally(e -> {
+                    LOG.error("Failed to upgrade schema for a table [name=" + 
tblName + ", id=" + tblId + ']', e);
+
+                    onEvent(TableEvent.ALTER, new TableEventParameters(tblId, 
tblName), e);
+
+                    return null;
+                })
+                .thenRun(() ->
+                    onEvent(TableEvent.ALTER, new TableEventParameters(tblId, 
tblName), null)
+                );
+
+            schemaMgr.listen(SchemaEvent.CHANGED, new EventListener<>() {
+                @Override public boolean notify(@NotNull SchemaEventParameters 
parameters, @Nullable Throwable e) {
+                    if (!tblId.equals(parameters.tableId()) && 
parameters.schemaRegistry().lastSchemaVersion() < ver)
+                        return false;
+
+                    if (e == null)
+                        schemaReadyFut.complete(parameters);
+                    else
+                        schemaReadyFut.completeExceptionally(e);
+
+                    return true;
+                }
+
+                @Override public void remove(@NotNull Throwable e) {
+                    schemaReadyFut.completeExceptionally(e);
+                }
+            });
+        }
+
+        return futs;
     }
 
     /** {@inheritDoc} */
     @Override public Table createTable(String name, Consumer<TableChange> 
tableInitChange) {
         CompletableFuture<Table> tblFut = new CompletableFuture<>();
 
-        listen(TableEvent.CREATE, new EventListener<TableEventParameters>() {
+        listen(TableEvent.CREATE, new EventListener<>() {
             @Override public boolean notify(@NotNull TableEventParameters 
parameters, @Nullable Throwable e) {
                 String tableName = parameters.tableName();
 
@@ -362,10 +469,49 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
     }
 
     /** {@inheritDoc} */
+    @Override public void alterTable(String name, Consumer<TableChange> 
tableChange) {
+        CompletableFuture<Void> tblFut = new CompletableFuture<>();
+
+        listen(TableEvent.ALTER, new EventListener<>() {
+            @Override public boolean notify(@NotNull TableEventParameters 
parameters, @Nullable Throwable e) {
+                String tableName = parameters.tableName();
+
+                if (!name.equals(tableName))
+                    return false;
+
+                if (e == null) {
+                    tblFut.complete(null);
+                }
+                else
+                    tblFut.completeExceptionally(e);
+
+                return true;
+            }
+
+            @Override public void remove(@NotNull Throwable e) {
+                tblFut.completeExceptionally(e);
+            }
+        });
+
+        try {
+            configurationMgr.configurationRegistry()
+                
.getConfiguration(TablesConfiguration.KEY).tables().change(change ->
+                change.update(name, tableChange)).get();
+        }
+        catch (InterruptedException | ExecutionException e) {
+            LOG.error("Table wasn't created [name=" + name + ']', e);
+
+            tblFut.completeExceptionally(e);
+        }
+
+        tblFut.join();
+    }
+
+    /** {@inheritDoc} */
     @Override public void dropTable(String name) {
         CompletableFuture<Void> dropTblFut = new CompletableFuture<>();
 
-        listen(TableEvent.DROP, new EventListener<TableEventParameters>() {
+        listen(TableEvent.DROP, new EventListener<>() {
             @Override public boolean notify(@NotNull TableEventParameters 
parameters, @Nullable Throwable e) {
                 String tableName = parameters.tableName();
 
@@ -428,7 +574,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
     private HashSet<String> tableNamesConfigured() {
         IgniteBiTuple<ByteArray, ByteArray> range = toRange(new 
ByteArray(PUBLIC_PREFIX));
 
-        HashSet tableNames = new HashSet();
+        HashSet<String> tableNames = new HashSet<>();
 
         try (Cursor<Entry> cursor = metaStorageMgr.range(range.get1(), 
range.get2())) {
             while (cursor.hasNext()) {
@@ -438,7 +584,9 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                 int idx = -1;
 
-                while ((idx = keyTail.indexOf('.', idx + 1)) > 0 && 
keyTail.charAt(idx - 1) == '\\');
+                //noinspection StatementWithEmptyBody
+                while ((idx = keyTail.indexOf('.', idx + 1)) > 0 && 
keyTail.charAt(idx - 1) == '\\')
+                    ;
 
                 String tablName = keyTail.substring(0, idx);
 
@@ -476,7 +624,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
         CompletableFuture<Table> getTblFut = new CompletableFuture<>();
 
-        EventListener<TableEventParameters> clo = new 
EventListener<TableEventParameters>() {
+        EventListener<TableEventParameters> clo = new EventListener<>() {
             @Override public boolean notify(@NotNull TableEventParameters 
parameters, @Nullable Throwable e) {
                 if (e instanceof ListenerRemovedException) {
                     getTblFut.completeExceptionally(e);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEvent.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEvent.java
index f2a737e..94d508a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEvent.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEvent.java
@@ -26,6 +26,9 @@ public enum TableEvent implements Event {
     /** This event is fired when a table was created. */
     CREATE,
 
+    /** This event is fired when a table config was changed. */
+    ALTER,
+
     /** This event is fired when a table was dropped. */
     DROP
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/TableBinaryViewOperationsTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/TableBinaryViewOperationsTest.java
index f9eea5b..d5ddca8 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/TableBinaryViewOperationsTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/TableBinaryViewOperationsTest.java
@@ -53,8 +53,8 @@ public class TableBinaryViewOperationsTest {
         SchemaDescriptor schema = new SchemaDescriptor(
             tableId,
             1,
-            new Column[] {new Column("id", NativeTypes.LONG, false)},
-            new Column[] {new Column("val", NativeTypes.LONG, false)}
+            new Column[]{new Column("id", NativeTypes.LONG, false)},
+            new Column[]{new Column("val", NativeTypes.LONG, false)}
         );
 
         Table tbl = new TableImpl(new DummyInternalTableImpl(), new 
DummySchemaManagerImpl(schema));
@@ -88,8 +88,8 @@ public class TableBinaryViewOperationsTest {
         SchemaDescriptor schema = new SchemaDescriptor(
             tableId,
             1,
-            new Column[] {new Column("id", NativeTypes.LONG, false)},
-            new Column[] {new Column("val", NativeTypes.LONG, false)}
+            new Column[]{new Column("id", NativeTypes.LONG, false)},
+            new Column[]{new Column("val", NativeTypes.LONG, false)}
         );
 
         Table tbl = new TableImpl(new DummyInternalTableImpl(), new 
DummySchemaManagerImpl(schema));
@@ -124,8 +124,8 @@ public class TableBinaryViewOperationsTest {
         SchemaDescriptor schema = new SchemaDescriptor(
             tableId,
             1,
-            new Column[] {new Column("id", NativeTypes.LONG, false)},
-            new Column[] {new Column("val", NativeTypes.LONG, false)}
+            new Column[]{new Column("id", NativeTypes.LONG, false)},
+            new Column[]{new Column("val", NativeTypes.LONG, false)}
         );
 
         Table tbl = new TableImpl(new DummyInternalTableImpl(), new 
DummySchemaManagerImpl(schema));
@@ -157,8 +157,8 @@ public class TableBinaryViewOperationsTest {
         SchemaDescriptor schema = new SchemaDescriptor(
             tableId,
             1,
-            new Column[] {new Column("id", NativeTypes.LONG, false)},
-            new Column[] {new Column("val", NativeTypes.LONG, false)}
+            new Column[]{new Column("id", NativeTypes.LONG, false)},
+            new Column[]{new Column("val", NativeTypes.LONG, false)}
         );
 
         Table tbl = new TableImpl(new DummyInternalTableImpl(), new 
DummySchemaManagerImpl(schema));
@@ -188,8 +188,8 @@ public class TableBinaryViewOperationsTest {
         SchemaDescriptor schema = new SchemaDescriptor(
             tableId,
             1,
-            new Column[] {new Column("id", NativeTypes.LONG, false)},
-            new Column[] {new Column("val", NativeTypes.LONG, false)}
+            new Column[]{new Column("id", NativeTypes.LONG, false)},
+            new Column[]{new Column("val", NativeTypes.LONG, false)}
         );
 
         Table tbl = new TableImpl(new DummyInternalTableImpl(), new 
DummySchemaManagerImpl(schema));
@@ -240,8 +240,8 @@ public class TableBinaryViewOperationsTest {
         SchemaDescriptor schema = new SchemaDescriptor(
             tableId,
             1,
-            new Column[] {new Column("id", NativeTypes.LONG, false)},
-            new Column[] {new Column("val", NativeTypes.LONG, false)}
+            new Column[]{new Column("id", NativeTypes.LONG, false)},
+            new Column[]{new Column("val", NativeTypes.LONG, false)}
         );
 
         Table tbl = new TableImpl(new DummyInternalTableImpl(), new 
DummySchemaManagerImpl(schema));
@@ -276,8 +276,8 @@ public class TableBinaryViewOperationsTest {
         SchemaDescriptor schema = new SchemaDescriptor(
             tableId,
             1,
-            new Column[] {new Column("id", NativeTypes.LONG, false)},
-            new Column[] {new Column("val", NativeTypes.LONG, false)}
+            new Column[]{new Column("id", NativeTypes.LONG, false)},
+            new Column[]{new Column("val", NativeTypes.LONG, false)}
         );
 
         Table tbl = new TableImpl(new DummyInternalTableImpl(), new 
DummySchemaManagerImpl(schema));
@@ -313,8 +313,8 @@ public class TableBinaryViewOperationsTest {
         SchemaDescriptor schema = new SchemaDescriptor(
             tableId,
             1,
-            new Column[] {new Column("id", NativeTypes.LONG, false)},
-            new Column[] {
+            new Column[]{new Column("id", NativeTypes.LONG, false)},
+            new Column[]{
                 new Column("val", NativeTypes.LONG, true),
                 new Column("str", NativeTypes.stringOf(3), true),
                 new Column("blob", NativeTypes.blobOf(3), true)
@@ -326,7 +326,7 @@ public class TableBinaryViewOperationsTest {
         final Tuple keyTuple0 = new TestTupleBuilder().set("id", 0).set("id1", 
0).build();
         final Tuple keyTuple1 = new TestTupleBuilder().set("id1", 0).build();
         final Tuple tuple0 = new TestTupleBuilder().set("id", 1L).set("str", 
"qweqweqwe").set("val", 11L).build();
-        final Tuple tuple1 = new TestTupleBuilder().set("id", 1L).set("blob", 
new byte[] {0, 1, 2, 3}).set("val", 22L).build();
+        final Tuple tuple1 = new TestTupleBuilder().set("id", 1L).set("blob", 
new byte[]{0, 1, 2, 3}).set("val", 22L).build();
 
         assertThrows(InvalidTypeException.class, () -> tbl.get(keyTuple0));
         assertThrows(IllegalArgumentException.class, () -> tbl.get(keyTuple1));
@@ -349,11 +349,11 @@ public class TableBinaryViewOperationsTest {
         SchemaDescriptor schema = new SchemaDescriptor(
             tableId,
             1,
-            new Column[] {new Column("id", NativeTypes.LONG, false)},
-            new Column[] {
+            new Column[]{new Column("id", NativeTypes.LONG, false)},
+            new Column[]{
                 new Column("val", NativeTypes.LONG, true, () -> 28L),
                 new Column("str", NativeTypes.stringOf(3), true, () -> "ABC"),
-                new Column("blob", NativeTypes.blobOf(3), true, () -> new 
byte[] {0, 1, 2})
+                new Column("blob", NativeTypes.blobOf(3), true, () -> new 
byte[]{0, 1, 2})
             }
         );
 
@@ -363,7 +363,7 @@ public class TableBinaryViewOperationsTest {
         final Tuple keyTuple1 = tbl.tupleBuilder().set("id", 1L).build();
 
         final Tuple tuple0 = tbl.tupleBuilder().set("id", 0L).build();
-        final Tuple tupleExpected0 = tbl.tupleBuilder().set("id", 
0L).set("val", 28L).set("str", "ABC").set("blob", new byte[] {0, 1, 2}).build();
+        final Tuple tupleExpected0 = tbl.tupleBuilder().set("id", 
0L).set("val", 28L).set("str", "ABC").set("blob", new byte[]{0, 1, 2}).build();
         final Tuple tuple1 = tbl.tupleBuilder().set("id", 1L).set("val", 
null).set("str", null).set("blob", null).build();
 
         tbl.insert(tuple0);
@@ -427,7 +427,7 @@ public class TableBinaryViewOperationsTest {
             if (val1 instanceof byte[] && val2 instanceof byte[])
                 Assertions.assertArrayEquals((byte[])val1, (byte[])val2, 
"Equality check failed: colIdx=" + col.schemaIndex());
             else
-               Assertions.assertEquals(val1, val2, "Equality check failed: 
colIdx=" + col.schemaIndex());
+                Assertions.assertEquals(val1, val2, "Equality check failed: 
colIdx=" + col.schemaIndex());
         }
     }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
index 9ecaec4..aa47bab 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.table.impl;
 
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.Row;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.jetbrains.annotations.NotNull;
@@ -52,4 +54,16 @@ public class DummySchemaManagerImpl implements 
SchemaRegistry {
 
         return schema;
     }
+
+    /** {@inheritDoc} */
+    @Override public int lastSchemaVersion() {
+        return schema.version();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Row resolve(BinaryRow row) {
+        assert row.schemaVersion() == schema.version();
+
+        return new Row(schema, row);
+    }
 }

Reply via email to