This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c9a1ab9  IGNITE-14864: Schema update. Merge multiple converters 
stages. (#194)
c9a1ab9 is described below

commit c9a1ab902045fda232c0caac89f4f682a62e5e24
Author: Andrew V. Mashenkov <amashen...@users.noreply.github.com>
AuthorDate: Fri Jul 16 14:16:27 2021 +0300

    IGNITE-14864: Schema update. Merge multiple converters stages. (#194)
---
 .../runner/app/AbstractSchemaChangeTest.java       | 123 +++++++++++-
 .../runner/app/DynamicTableCreationTest.java       |  67 +++----
 .../runner/app/SchemaChangeKVViewTest.java         | 137 +++++++++++++
 .../runner/app/SchemaChangeTableViewTest.java      | 174 +++++++++++++++++
 .../internal/runner/app/TableCreationTest.java     | 212 ++++++++++-----------
 .../apache/ignite/internal/schema/NativeTypes.java |   1 +
 .../ignite/internal/schema/SchemaManager.java      |  38 ----
 .../schema/builder/SchemaTableBuilderImpl.java     |   3 +
 .../internal/schema/mapping/ColumnMapping.java     |  33 ++++
 .../schema/registry/SchemaRegistryImpl.java        |  54 +++++-
 .../schema/registry/UpgradingRowAdapter.java       |   6 +-
 .../{ => registry}/SchemaRegistryImplTest.java     |  66 ++++++-
 .../internal/table/distributed/TableManager.java   |   2 +-
 13 files changed, 720 insertions(+), 196 deletions(-)

diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
index 3788b5a..8070888 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
@@ -22,8 +22,10 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 import org.apache.ignite.app.Ignite;
 import org.apache.ignite.app.IgnitionManager;
+import org.apache.ignite.internal.schema.InvalidTypeException;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -33,6 +35,9 @@ import org.apache.ignite.schema.SchemaBuilders;
 import org.apache.ignite.schema.SchemaTable;
 import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 import static 
org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter.convert;
@@ -94,6 +99,90 @@ abstract class AbstractSchemaChangeTest {
     }
 
     /**
+     * Check unsupported column type change.
+     */
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-15056";)
+    @Test
+    public void testChangeColumnType() {
+        List<Ignite> grid = startGrid();
+
+        createTable(grid);
+
+        Assertions.assertThrows(InvalidTypeException.class, () -> {
+            grid.get(0).tables().alterTable(TABLE,
+                tblChanger -> tblChanger.changeColumns(cols -> {
+                    final String colKey = 
tblChanger.columns().namedListKeys().stream()
+                        .filter(c -> 
"valInt".equals(tblChanger.columns().get(c).name()))
+                        .findFirst()
+                        .orElseThrow(() -> {
+                            throw new IllegalStateException("Column not 
found.");
+                        });
+
+                    tblChanger.changeColumns(listChanger ->
+                        listChanger.update(colKey, colChanger -> 
colChanger.changeType(c -> c.changeType("STRING")))
+                    );
+                })
+            );
+        });
+    }
+
+    /**
+     * Check unsupported column nullability change.
+     */
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-15056";)
+    @Test
+    public void testMakeColumnNonNullable() {
+        List<Ignite> grid = startGrid();
+
+        createTable(grid);
+
+        Assertions.assertThrows(InvalidTypeException.class, () -> {
+            grid.get(0).tables().alterTable(TABLE,
+                tblChanger -> tblChanger.changeColumns(cols -> {
+                    final String colKey = 
tblChanger.columns().namedListKeys().stream()
+                        .filter(c -> 
"valInt".equals(tblChanger.columns().get(c).name()))
+                        .findFirst()
+                        .orElseThrow(() -> {
+                            throw new IllegalStateException("Column not 
found.");
+                        });
+
+                    tblChanger.changeColumns(listChanger ->
+                        listChanger.update(colKey, colChanger -> 
colChanger.changeNullable(false))
+                    );
+                })
+            );
+        });
+    }
+
+    /**
+     * Check unsupported nullability change.
+     */
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-15056";)
+    @Test
+    public void testMakeColumnsNullable() {
+        List<Ignite> grid = startGrid();
+
+        createTable(grid);
+
+        Assertions.assertThrows(InvalidTypeException.class, () -> {
+            grid.get(0).tables().alterTable(TABLE,
+                tblChanger -> tblChanger.changeColumns(cols -> {
+                    final String colKey = 
tblChanger.columns().namedListKeys().stream()
+                        .filter(c -> 
"valStr".equals(tblChanger.columns().get(c).name()))
+                        .findFirst()
+                        .orElseThrow(() -> {
+                            throw new IllegalStateException("Column not 
found.");
+                        });
+
+                    tblChanger.changeColumns(listChanger ->
+                        listChanger.update(colKey, colChanger -> 
colChanger.changeNullable(true))
+                    );
+                })
+            );
+        });
+    }
+
+    /**
      * @return Grid nodes.
      */
     @NotNull protected List<Ignite> startGrid() {
@@ -131,7 +220,8 @@ abstract class AbstractSchemaChangeTest {
                 int colIdx = 
chng.columns().namedListKeys().stream().mapToInt(Integer::parseInt).max().getAsInt()
 + 1;
 
                 cols.create(String.valueOf(colIdx), colChg -> 
convert(columnToAdd, colChg));
-            }));
+            })
+        );
     }
 
     /**
@@ -146,8 +236,10 @@ abstract class AbstractSchemaChangeTest {
                     .findAny()
                     .orElseThrow(() -> {
                         throw new IllegalStateException("Column not found.");
-                    }));
-            }));
+                    })
+                );
+            })
+        );
     }
 
     /**
@@ -168,6 +260,29 @@ abstract class AbstractSchemaChangeTest {
                 tblChanger.changeColumns(listChanger ->
                     listChanger.update(colKey, colChanger -> 
colChanger.changeName(newName))
                 );
-            }));
+            })
+        );
+    }
+
+    /**
+     * @param nodes Cluster nodes.
+     * @param colName Column name.
+     * @param defSup Default value supplier.
+     */
+    protected void changeDefault(List<Ignite> nodes, String colName, 
Supplier<Object> defSup) {
+        nodes.get(0).tables().alterTable(TABLE,
+            tblChanger -> tblChanger.changeColumns(cols -> {
+                final String colKey = 
tblChanger.columns().namedListKeys().stream()
+                    .filter(c -> 
colName.equals(tblChanger.columns().get(c).name()))
+                    .findFirst()
+                    .orElseThrow(() -> {
+                        throw new IllegalStateException("Column not found.");
+                    });
+
+                tblChanger.changeColumns(listChanger ->
+                    listChanger.update(colKey, colChanger -> 
colChanger.changeDefaultValue(defSup.get().toString()))
+                );
+            })
+        );
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java
index 884e7ff..575db70 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java
@@ -25,13 +25,11 @@ import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.app.Ignite;
 import org.apache.ignite.app.IgnitionManager;
-import org.apache.ignite.internal.schema.SchemaManager;
 import 
org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
+import org.apache.ignite.internal.table.SchemaMismatchException;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.apache.ignite.internal.table.SchemaMismatchException;
 import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.schema.ColumnType;
 import org.apache.ignite.schema.SchemaBuilders;
 import org.apache.ignite.schema.SchemaTable;
@@ -53,41 +51,38 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 @Disabled("https://issues.apache.org/jira/browse/IGNITE-14581";)
 @ExtendWith(WorkDirectoryExtension.class)
 class DynamicTableCreationTest {
-    /** The logger. */
-    private static final IgniteLogger LOG = 
IgniteLogger.forClass(SchemaManager.class);
-
     /** Nodes bootstrap configuration. */
     private final Map<String, String> nodesBootstrapCfg = new 
LinkedHashMap<>() {{
-            put("node0", "{\n" +
-                "  \"node\": {\n" +
-                "    \"metastorageNodes\":[ \"node0\" ]\n" +
-                "  },\n" +
-                "  \"network\": {\n" +
-                "    \"port\":3344,\n" +
-                "    \"netClusterNodes\":[ \"localhost:3344\", 
\"localhost:3345\", \"localhost:3346\" ]\n" +
-                "  }\n" +
-                "}");
-
-            put("node1", "{\n" +
-                "  \"node\": {\n" +
-                "    \"metastorageNodes\":[ \"node0\" ]\n" +
-                "  },\n" +
-                "  \"network\": {\n" +
-                "    \"port\":3345,\n" +
-                "    \"netClusterNodes\":[ \"localhost:3344\", 
\"localhost:3345\", \"localhost:3346\" ]\n" +
-                "  }\n" +
-                "}");
-
-            put("node2", "{\n" +
-                "  \"node\": {\n" +
-                "    \"metastorageNodes\":[ \"node0\"]\n" +
-                "  },\n" +
-                "  \"network\": {\n" +
-                "    \"port\":3346,\n" +
-                "    \"netClusterNodes\":[ \"localhost:3344\", 
\"localhost:3345\", \"localhost:3346\" ]\n" +
-                "  }\n" +
-                "}");
-        }};
+        put("node0", "{\n" +
+            "  \"node\": {\n" +
+            "    \"metastorageNodes\":[ \"node0\" ]\n" +
+            "  },\n" +
+            "  \"network\": {\n" +
+            "    \"port\":3344,\n" +
+            "    \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", 
\"localhost:3346\" ]\n" +
+            "  }\n" +
+            "}");
+
+        put("node1", "{\n" +
+            "  \"node\": {\n" +
+            "    \"metastorageNodes\":[ \"node0\" ]\n" +
+            "  },\n" +
+            "  \"network\": {\n" +
+            "    \"port\":3345,\n" +
+            "    \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", 
\"localhost:3346\" ]\n" +
+            "  }\n" +
+            "}");
+
+        put("node2", "{\n" +
+            "  \"node\": {\n" +
+            "    \"metastorageNodes\":[ \"node0\"]\n" +
+            "  },\n" +
+            "  \"network\": {\n" +
+            "    \"port\":3346,\n" +
+            "    \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", 
\"localhost:3346\" ]\n" +
+            "  }\n" +
+            "}");
+    }};
 
     /** */
     private final List<Ignite> clusterNodes = new ArrayList<>();
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeKVViewTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeKVViewTest.java
index 944802d..977914b 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeKVViewTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeKVViewTest.java
@@ -17,9 +17,12 @@
 
 package org.apache.ignite.internal.runner.app;
 
+import java.io.Serializable;
 import java.util.List;
+import java.util.function.Supplier;
 import org.apache.ignite.app.Ignite;
 import org.apache.ignite.internal.table.ColumnNotFoundException;
+import org.apache.ignite.schema.Column;
 import org.apache.ignite.schema.ColumnType;
 import org.apache.ignite.schema.SchemaBuilders;
 import org.apache.ignite.table.KeyValueBinaryView;
@@ -166,4 +169,138 @@ class SchemaChangeKVViewTest extends 
AbstractSchemaChangeTest {
             assertThrows(ColumnNotFoundException.class, () -> 
kvView.get(keyTuple2).value("valInt"));
         }
     }
+
+    /**
+     * Check merge table schema changes.
+     */
+    @Test
+    public void testMergeChangesAddDropAdd() {
+        List<Ignite> grid = startGrid();
+
+        createTable(grid);
+
+        final Column column = SchemaBuilders.column("val", 
ColumnType.string()).asNullable().withDefaultValue("default").build();
+
+        KeyValueBinaryView kvView = grid.get(1).tables().table(TABLE).kvView();
+
+        {
+            kvView.put(kvView.tupleBuilder().set("key", 1L).build(),
+                kvView.tupleBuilder().set("valInt", 111).build());
+
+            assertThrows(ColumnNotFoundException.class, () -> kvView.put(
+                kvView.tupleBuilder().set("key", 2L).build(),
+                kvView.tupleBuilder().set("val", "I'not exists").build())
+            );
+        }
+
+        addColumn(grid, column);
+
+        {
+            assertNull(kvView.get(kvView.tupleBuilder().set("key", 
2L).build()));
+
+            kvView.put(kvView.tupleBuilder().set("key", 2L).build(),
+                kvView.tupleBuilder().set("valInt", 222).set("val", 
"string").build());
+
+            kvView.put(kvView.tupleBuilder().set("key", 3L).build(),
+                kvView.tupleBuilder().set("valInt", 333).build());
+        }
+
+        dropColumn(grid, column.name());
+
+        {
+            kvView.put(kvView.tupleBuilder().set("key", 4L).build(),
+                kvView.tupleBuilder().set("valInt", 444).build());
+
+            assertThrows(ColumnNotFoundException.class, () -> kvView.put(
+                kvView.tupleBuilder().set("key", 4L).build(),
+                kvView.tupleBuilder().set("val", "I'm not exist").build())
+            );
+        }
+
+        addColumn(grid, SchemaBuilders.column("val", 
ColumnType.string()).asNullable().withDefaultValue("default").build());
+
+        {
+            kvView.put(kvView.tupleBuilder().set("key", 5L).build(),
+                kvView.tupleBuilder().set("valInt", 555).build());
+
+            // Check old row conversion.
+            Tuple keyTuple1 = kvView.tupleBuilder().set("key", 1L).build();
+
+            assertEquals(111, (Integer)kvView.get(keyTuple1).value("valInt"));
+            assertEquals("default", kvView.get(keyTuple1).value("val"));
+
+            Tuple keyTuple2 = kvView.tupleBuilder().set("key", 2L).build();
+
+            assertEquals(222, (Integer)kvView.get(keyTuple2).value("valInt"));
+            assertEquals("default", kvView.get(keyTuple2).value("val"));
+
+            Tuple keyTuple3 = kvView.tupleBuilder().set("key", 3L).build();
+
+            assertEquals(333, (Integer)kvView.get(keyTuple3).value("valInt"));
+            assertEquals("default", kvView.get(keyTuple3).value("val"));
+
+            Tuple keyTuple4 = kvView.tupleBuilder().set("key", 4L).build();
+
+            assertEquals(444, (Integer)kvView.get(keyTuple4).value("valInt"));
+            assertEquals("default", kvView.get(keyTuple4).value("val"));
+
+            Tuple keyTuple5 = kvView.tupleBuilder().set("key", 5L).build();
+
+            assertEquals(555, (Integer)kvView.get(keyTuple5).value("valInt"));
+            assertEquals("default", kvView.get(keyTuple5).value("val"));
+        }
+    }
+
+
+    /**
+     * Check merge table schema changes.
+     */
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-14896";)
+    @Test
+    public void testMergeChangesColumnDefault() {
+        List<Ignite> grid = startGrid();
+
+        createTable(grid);
+
+        KeyValueBinaryView vkView = grid.get(1).tables().table(TABLE).kvView();
+
+        final String colName = "valStr";
+
+        {
+            vkView.put(vkView.tupleBuilder().set("key", 1L).build(), 
vkView.tupleBuilder().set("valInt", 111).build());
+        }
+
+        changeDefault(grid, colName, (Supplier<Object> & Serializable)() -> 
"newDefault");
+        addColumn(grid, SchemaBuilders.column("val", 
ColumnType.string()).withDefaultValue("newDefault").build());
+
+        {
+            vkView.put(vkView.tupleBuilder().set("key", 2L).build(), 
vkView.tupleBuilder().set("valInt", 222).build());
+        }
+
+        changeDefault(grid, colName, (Supplier<Object> & Serializable)() -> 
"brandNewDefault");
+        changeDefault(grid, "val", (Supplier<Object> & Serializable)() -> 
"brandNewDefault");
+
+        {
+            vkView.put(vkView.tupleBuilder().set("key", 3L).build(), 
vkView.tupleBuilder().set("valInt", 333).build());
+
+            // Check old row conversion.
+            Tuple keyTuple1 = vkView.tupleBuilder().set("key", 1L).build();
+
+            assertEquals(111, (Integer)vkView.get(keyTuple1).value("valInt"));
+            assertEquals("default", vkView.get(keyTuple1).value("valStr"));
+            assertEquals("newDefault", vkView.get(keyTuple1).value("val"));
+
+            Tuple keyTuple2 = vkView.tupleBuilder().set("key", 2L).build();
+
+            assertEquals(222, (Integer)vkView.get(keyTuple2).value("valInt"));
+            assertEquals("newDefault", vkView.get(keyTuple2).value("valStr"));
+            assertEquals("newDefault", vkView.get(keyTuple2).value("val"));
+
+            Tuple keyTuple3 = vkView.tupleBuilder().set("key", 3L).build();
+
+            assertEquals(333, (Integer)vkView.get(keyTuple3).value("valInt"));
+            assertEquals("brandNewDefault", 
vkView.get(keyTuple3).value("valStr"));
+            assertEquals("brandNewDefault", 
vkView.get(keyTuple3).value("val"));
+        }
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeTableViewTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeTableViewTest.java
index 090ce72..d5836bd 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeTableViewTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeTableViewTest.java
@@ -17,9 +17,12 @@
 
 package org.apache.ignite.internal.runner.app;
 
+import java.io.Serializable;
 import java.util.List;
+import java.util.function.Supplier;
 import org.apache.ignite.app.Ignite;
 import org.apache.ignite.internal.table.ColumnNotFoundException;
+import org.apache.ignite.schema.Column;
 import org.apache.ignite.schema.ColumnType;
 import org.apache.ignite.schema.SchemaBuilders;
 import org.apache.ignite.table.Table;
@@ -28,6 +31,7 @@ import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /**
@@ -158,4 +162,174 @@ class SchemaChangeTableViewTest extends 
AbstractSchemaChangeTest {
             assertThrows(ColumnNotFoundException.class, () -> 
tbl.get(keyTuple2).value("valInt"));
         }
     }
+
+    /**
+     * Rename column then add a new column with same name.
+     */
+    @Test
+    void testRenameThenAddColumnWithSameName() {
+        List<Ignite> grid = startGrid();
+
+        createTable(grid);
+
+        Table tbl = grid.get(0).tables().table(TABLE);
+
+        {
+            tbl.insert(tbl.tupleBuilder().set("key", 1L).set("valInt", 
111).build());
+
+            assertThrows(ColumnNotFoundException.class,
+                () -> tbl.insert(tbl.tupleBuilder().set("key", 2L).set("val2", 
-222).build())
+            );
+        }
+
+        renameColumn(grid, "valInt", "val2");
+        addColumn(grid, SchemaBuilders.column("valInt", 
ColumnType.INT32).asNullable().withDefaultValue(-1).build());
+
+        {
+            // Check old row conversion.
+            Tuple keyTuple1 = tbl.tupleBuilder().set("key", 1L).build();
+
+            assertEquals(1, (Long)tbl.get(keyTuple1).value("key"));
+            assertEquals(111, (Integer)tbl.get(keyTuple1).value("val2"));
+            assertEquals(-1, (Integer)tbl.get(keyTuple1).value("valInt"));
+
+            // Check tuple of outdated schema.
+            assertNull(tbl.get(tbl.tupleBuilder().set("key", 2L).build()));
+
+            // Check tuple of correct schema.
+            tbl.insert(tbl.tupleBuilder().set("key", 2L).set("val2", 
222).build());
+
+            Tuple keyTuple2 = tbl.tupleBuilder().set("key", 2L).build();
+
+            assertEquals(2, (Long)tbl.get(keyTuple2).value("key"));
+            assertEquals(222, (Integer)tbl.get(keyTuple2).value("val2"));
+            assertEquals(-1, (Integer)tbl.get(keyTuple2).value("valInt"));
+        }
+    }
+
+    /**
+     * Check merge table schema changes.
+     */
+    @Test
+    public void testMergeChangesAddDropAdd() {
+        List<Ignite> grid = startGrid();
+
+        createTable(grid);
+
+        final Column column = SchemaBuilders.column("val", 
ColumnType.string()).asNullable().withDefaultValue("default").build();
+
+        Table tbl = grid.get(0).tables().table(TABLE);
+
+        {
+            tbl.insert(tbl.tupleBuilder().set("key", 1L).set("valInt", 
111).build());
+
+            assertThrows(ColumnNotFoundException.class, () -> tbl.insert(
+                tbl.tupleBuilder().set("key", 2L).set("val", "I'not 
exists").build())
+            );
+        }
+
+        addColumn(grid, column);
+
+        {
+            assertNull(tbl.get(tbl.tupleBuilder().set("key", 2L).build()));
+
+            tbl.insert(tbl.tupleBuilder().set("key", 2L).set("valInt", 
222).set("val", "string").build());
+
+            tbl.insert(tbl.tupleBuilder().set("key", 3L).set("valInt", 
333).build());
+        }
+
+        dropColumn(grid, column.name());
+
+        {
+            tbl.insert(tbl.tupleBuilder().set("key", 4L).set("valInt", 
444).build());
+
+            assertThrows(ColumnNotFoundException.class, () -> tbl.insert(
+                tbl.tupleBuilder().set("key", 4L).set("val", "I'm not 
exist").build())
+            );
+        }
+
+        addColumn(grid, SchemaBuilders.column("val", 
ColumnType.string()).withDefaultValue("default").build());
+
+        {
+            tbl.insert(tbl.tupleBuilder().set("key", 5L).set("valInt", 
555).build());
+
+            // Check old row conversion.
+            Tuple keyTuple1 = tbl.tupleBuilder().set("key", 1L).build();
+
+            assertEquals(111, (Integer)tbl.get(keyTuple1).value("valInt"));
+            assertEquals("default", tbl.get(keyTuple1).value("val"));
+
+            Tuple keyTuple2 = tbl.tupleBuilder().set("key", 2L).build();
+
+            assertEquals(222, (Integer)tbl.get(keyTuple2).value("valInt"));
+            assertEquals("default", tbl.get(keyTuple2).value("val"));
+
+            Tuple keyTuple3 = tbl.tupleBuilder().set("key", 3L).build();
+
+            assertEquals(333, (Integer)tbl.get(keyTuple3).value("valInt"));
+            assertEquals("default", tbl.get(keyTuple3).value("val"));
+
+            Tuple keyTuple4 = tbl.tupleBuilder().set("key", 4L).build();
+
+            assertEquals(444, (Integer)tbl.get(keyTuple4).value("valInt"));
+            assertEquals("default", tbl.get(keyTuple4).value("val"));
+
+            Tuple keyTuple5 = tbl.tupleBuilder().set("key", 5L).build();
+
+            assertEquals(555, (Integer)tbl.get(keyTuple5).value("valInt"));
+            assertEquals("default", tbl.get(keyTuple5).value("val"));
+        }
+    }
+
+    /**
+     * Check merge column default value changes.
+     */
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-14896";)
+    @Test
+    public void testMergeChangesColumnDefault() {
+        List<Ignite> grid = startGrid();
+
+        createTable(grid);
+
+        Table tbl = grid.get(0).tables().table(TABLE);
+
+        final String colName = "valStr";
+
+        {
+            tbl.insert(tbl.tupleBuilder().set("key", 1L).set("valInt", 
111).build());
+        }
+
+        changeDefault(grid, colName, (Supplier<Object> & Serializable)() -> 
"newDefault");
+        addColumn(grid, SchemaBuilders.column("val", 
ColumnType.string()).withDefaultValue("newDefault").build());
+
+        {
+            tbl.insert(tbl.tupleBuilder().set("key", 2L).set("valInt", 
222).build());
+        }
+
+        changeDefault(grid, colName, (Supplier<Object> & Serializable)() -> 
"brandNewDefault");
+        changeDefault(grid, "val", (Supplier<Object> & Serializable)() -> 
"brandNewDefault");
+
+        {
+            tbl.insert(tbl.tupleBuilder().set("key", 3L).set("valInt", 
333).build());
+
+            // Check old row conversion.
+            Tuple keyTuple1 = tbl.tupleBuilder().set("key", 1L).build();
+
+            assertEquals(111, (Integer)tbl.get(keyTuple1).value("valInt"));
+            assertEquals("default", tbl.get(keyTuple1).value("valStr"));
+            assertEquals("newDefault", tbl.get(keyTuple1).value("val"));
+
+            Tuple keyTuple2 = tbl.tupleBuilder().set("key", 2L).build();
+
+            assertEquals(222, (Integer)tbl.get(keyTuple2).value("valInt"));
+            assertEquals("newDefault", tbl.get(keyTuple2).value("valStr"));
+            assertEquals("newDefault", tbl.get(keyTuple2).value("val"));
+
+            Tuple keyTuple3 = tbl.tupleBuilder().set("key", 3L).build();
+
+            assertEquals(333, (Integer)tbl.get(keyTuple3).value("valInt"));
+            assertEquals("brandNewDefault", 
tbl.get(keyTuple3).value("valStr"));
+            assertEquals("brandNewDefault", tbl.get(keyTuple3).value("val"));
+        }
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/TableCreationTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/TableCreationTest.java
index cedb240..03b8b8d 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/TableCreationTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/TableCreationTest.java
@@ -47,112 +47,112 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 class TableCreationTest {
     /** Nodes bootstrap configuration with preconfigured tables. */
     private final LinkedHashMap<String, String> nodesBootstrapCfg = new 
LinkedHashMap<>() {{
-            put("node0", "{\n" +
-                "  \"node\": {\n" +
-                "    \"name\":node0,\n" +
-                "    \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" +
-                "  },\n" +
-                "  \"network\": {\n" +
-                "    \"port\":3344,\n" +
-                "    \"netClusterNodes\":[ \"localhost:3344\", 
\"localhost:3345\", \"localhost:3346\" ]\n" +
-                "  },\n" +
-                "  \"table\": {\n" +
-                "       \"tables\": {\n" +
-                "           \"tbl1\": {\n" +
-                "               \"partitions\":10,\n" +
-                "               \"replicas\":2,\n" +
-                "               \"columns\": { \n" +
-                "                   \"key\": {\n" +
-                "                       \"type\": {" +
-                "                           \"type\":UUID\n" +
-                "                       },\n" +
-                "                       \"nullable\":false\n" +
-                "                   },\n" +
-                "                   \"affKey\": {\n" +
-                "                       \"type\": {" +
-                "                           \"type\":INT64\n" +
-                "                       },\n" +
-                "                       \"nullable\":false\n" +
-                "                   },\n" +
-                "                   \"valString\": {\n" +
-                "                       \"type\": {" +
-                "                           \"type\":String\n" +
-                "                       },\n" +
-                "                       \"nullable\":false\n" +
-                "                   },\n" +
-                "                   \"valInt\": {\n" +
-                "                       \"type\": {" +
-                "                           \"type\":INT32\n" +
-                "                       },\n" +
-                "                       \"nullable\":false\n" +
-                "                   },\n" +
-                "                   \"valNullable\": {\n" +
-                "                       \"type\": {" +
-                "                           \"type\":String\n" +
-                "                       },\n" +
-                "                       \"nullable\":true\n" +
-                "                   }\n" +
-                "               },\n" + /* Columns. */
-                "               \"indices\": {\n" +
-                "                   \"PK\": {\n" +
-                "                       \"type\":PRIMARY,\n" +
-                "                       \"columns\": {\n" +
-                "                           \"key\": {\n" +
-                "                               \"asc\":true\n" +
-                "                           },\n" +
-                "                           \"affKey\": {}\n" +
-                "                       },\n" + /* Columns. */
-                "                       \"affinityColumns\":[ \"affKey\" ]\n" +
-                "                   }\n" +
-                "               }\n" + /* Indices. */
-                "           },\n" + /* Table. */
-                "\n" +
-                "           \"tbl2\": {\n" + // Table minimal configuration.
-                "               \"columns\": { \n" +
-                "                   \"key\": {\n" +
-                "                       \"type\": {" +
-                "                           \"type\":INT64\n" +
-                "                       },\n" +
-                "                   },\n" +
-                "                   \"val\": {\n" +
-                "                       \"type\": {" +
-                "                           \"type\":INT64\n" +
-                "                       },\n" +
-                "                   }\n" +
-                "               },\n" + /* Columns. */
-                "               \"indices\": {\n" +
-                "                   \"PK\": {\n" +
-                "                       \"type\":PRIMARY,\n" +
-                "                       \"columns\": {\n" +
-                "                           \"key\": {}\n" +
-                "                       },\n" + /* Columns. */
-                "                   }\n" +
-                "               }\n" + /* Indices. */
-                "           }\n" + /* Table. */
-                "       }\n" + /* Tables. */
-                "  }\n" + /* Root. */
-                "}");
-
-            put("node1", "{\n" +
-                "  \"node\": {\n" +
-                "    \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" +
-                "  },\n" +
-                "  \"network\": {\n" +
-                "    \"port\":3345,\n" +
-                "    \"netClusterNodes\":[ \"localhost:3344\", 
\"localhost:3345\", \"localhost:3346\" ]\n" +
-                "  }\n" +
-                "}");
-
-            put("node2", "{\n" +
-                "  \"node\": {\n" +
-                "    \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" +
-                "  },\n" +
-                "  \"network\": {\n" +
-                "    \"port\":3346,\n" +
-                "    \"netClusterNodes\":[ \"localhost:3344\", 
\"localhost:3345\", \"localhost:3346\" ]\n" +
-                "  }\n" +
-                "}");
-        }};
+        put("node0", "{\n" +
+            "  \"node\": {\n" +
+            "    \"name\":node0,\n" +
+            "    \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" +
+            "  },\n" +
+            "  \"network\": {\n" +
+            "    \"port\":3344,\n" +
+            "    \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", 
\"localhost:3346\" ]\n" +
+            "  },\n" +
+            "  \"table\": {\n" +
+            "       \"tables\": {\n" +
+            "           \"tbl1\": {\n" +
+            "               \"partitions\":10,\n" +
+            "               \"replicas\":2,\n" +
+            "               \"columns\": { \n" +
+            "                   \"key\": {\n" +
+            "                       \"type\": {" +
+            "                           \"type\":UUID\n" +
+            "                       },\n" +
+            "                       \"nullable\":false\n" +
+            "                   },\n" +
+            "                   \"affKey\": {\n" +
+            "                       \"type\": {" +
+            "                           \"type\":INT64\n" +
+            "                       },\n" +
+            "                       \"nullable\":false\n" +
+            "                   },\n" +
+            "                   \"valString\": {\n" +
+            "                       \"type\": {" +
+            "                           \"type\":String\n" +
+            "                       },\n" +
+            "                       \"nullable\":false\n" +
+            "                   },\n" +
+            "                   \"valInt\": {\n" +
+            "                       \"type\": {" +
+            "                           \"type\":INT32\n" +
+            "                       },\n" +
+            "                       \"nullable\":false\n" +
+            "                   },\n" +
+            "                   \"valNullable\": {\n" +
+            "                       \"type\": {" +
+            "                           \"type\":String\n" +
+            "                       },\n" +
+            "                       \"nullable\":true\n" +
+            "                   }\n" +
+            "               },\n" + /* Columns. */
+            "               \"indices\": {\n" +
+            "                   \"PK\": {\n" +
+            "                       \"type\":PRIMARY,\n" +
+            "                       \"columns\": {\n" +
+            "                           \"key\": {\n" +
+            "                               \"asc\":true\n" +
+            "                           },\n" +
+            "                           \"affKey\": {}\n" +
+            "                       },\n" + /* Columns. */
+            "                       \"affinityColumns\":[ \"affKey\" ]\n" +
+            "                   }\n" +
+            "               }\n" + /* Indices. */
+            "           },\n" + /* Table. */
+            "\n" +
+            "           \"tbl2\": {\n" + // Table minimal configuration.
+            "               \"columns\": { \n" +
+            "                   \"key\": {\n" +
+            "                       \"type\": {" +
+            "                           \"type\":INT64\n" +
+            "                       },\n" +
+            "                   },\n" +
+            "                   \"val\": {\n" +
+            "                       \"type\": {" +
+            "                           \"type\":INT64\n" +
+            "                       },\n" +
+            "                   }\n" +
+            "               },\n" + /* Columns. */
+            "               \"indices\": {\n" +
+            "                   \"PK\": {\n" +
+            "                       \"type\":PRIMARY,\n" +
+            "                       \"columns\": {\n" +
+            "                           \"key\": {}\n" +
+            "                       },\n" + /* Columns. */
+            "                   }\n" +
+            "               }\n" + /* Indices. */
+            "           }\n" + /* Table. */
+            "       }\n" + /* Tables. */
+            "  }\n" + /* Root. */
+            "}");
+
+        put("node1", "{\n" +
+            "  \"node\": {\n" +
+            "    \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" +
+            "  },\n" +
+            "  \"network\": {\n" +
+            "    \"port\":3345,\n" +
+            "    \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", 
\"localhost:3346\" ]\n" +
+            "  }\n" +
+            "}");
+
+        put("node2", "{\n" +
+            "  \"node\": {\n" +
+            "    \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" +
+            "  },\n" +
+            "  \"network\": {\n" +
+            "    \"port\":3346,\n" +
+            "    \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", 
\"localhost:3346\" ]\n" +
+            "  }\n" +
+            "}");
+    }};
 
     /** */
     private final List<Ignite> clusterNodes = new ArrayList<>();
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/NativeTypes.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/NativeTypes.java
index 1d5bedb..3fc2143 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/NativeTypes.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/NativeTypes.java
@@ -90,6 +90,7 @@ public class NativeTypes {
      *
      * @param precision Precision.
      * @param scale Scale.
+     * @return Native type.
      */
     public static NativeType decimalOf(int precision, int scale) {
         return new NumericNativeType(precision, scale);
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index 42039d1..bc653c2 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -208,50 +208,12 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
      * registers initial schema from configuration.
      *
      * @param tblId Table id.
-     * @param tblName Table name.
-     * @return Operation future.
-     */
-    public CompletableFuture<Boolean> updateSchemaForTable(final UUID tblId, 
String tblName) {
-        return vaultMgr.get(ByteArray.fromString(INTERNAL_PREFIX + tblId)).
-            thenCompose(entry -> {
-                TableConfiguration tblConfig = 
configurationMgr.configurationRegistry().
-                    
getConfiguration(TablesConfiguration.KEY).tables().get(tblName);
-
-                assert !entry.empty();
-
-                final int oldVer = (int)ByteUtils.bytesToLong(entry.value(), 
0);
-                final int newVer = oldVer + 1;
-
-                final ByteArray lastVerKey = new ByteArray(INTERNAL_PREFIX + 
tblId);
-                final ByteArray schemaKey = new ByteArray(INTERNAL_PREFIX + 
tblId + INTERNAL_VER_SUFFIX + newVer);
-
-                SchemaTable schemaTable = 
SchemaConfigurationConverter.convert(tblConfig.value());
-                final SchemaDescriptor desc = 
SchemaDescriptorConverter.convert(tblId, newVer, schemaTable);
-
-                return metaStorageMgr.invoke(Conditions.notExists(schemaKey),
-                    Operations.put(schemaKey, ByteUtils.toBytes(desc)),
-                    Operations.noop())
-                    //TODO: IGNITE-14679 Serialize schema.
-                    .thenCompose(res -> metaStorageMgr.invoke(
-                        
Conditions.value(lastVerKey).eq(ByteUtils.longToBytes(oldVer)),
-                        Operations.put(lastVerKey, 
ByteUtils.longToBytes(newVer)),
-                        Operations.noop()));
-            });
-    }
-
-    /**
-     * Creates schema registry for the table with existed schema or
-     * registers initial schema from configuration.
-     *
-     * @param tblId Table id.
-     * @param tblName Table name.
      * @param oldTbl Old table configuration.
      * @param newTbl New table configuraiton.
      * @return Operation future.
      */
     public CompletableFuture<Boolean> updateSchemaForTable(
         final UUID tblId,
-        String tblName,
         TableView oldTbl,
         TableView newTbl
     ) {
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/builder/SchemaTableBuilderImpl.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/builder/SchemaTableBuilderImpl.java
index 1da11d9..ae42c64 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/builder/SchemaTableBuilderImpl.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/builder/SchemaTableBuilderImpl.java
@@ -121,6 +121,9 @@ public class SchemaTableBuilderImpl implements 
SchemaTableBuilder {
 
     /**
      * Validate indices.
+     *
+     * @param indices Table indices.
+     * @param columns Table columns.
      */
     public static void validateIndices(Collection<TableIndex> indices, 
Collection<Column> columns) {
         Set<String> colNames = 
columns.stream().map(Column::name).collect(Collectors.toSet());
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/mapping/ColumnMapping.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/mapping/ColumnMapping.java
index 2125e9f..f0fcecb 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/mapping/ColumnMapping.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/mapping/ColumnMapping.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.schema.mapping;
 
 import java.io.Serializable;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
 
 /**
  * Column mapping helper.
@@ -35,12 +36,44 @@ public class ColumnMapping {
 
     /**
      * @param cols Number of columns.
+     * @return Column mapper builder.
      */
     public static ColumnMapperBuilder mapperBuilder(int cols) {
         return new ColumnMapperImpl(cols);
     }
 
     /**
+     * Builds mapper for given schema via merging schema mapper with the 
provided one.
+     * Used for builing columns mapper between arbitraty schema versions with 
bottom-&gt;top approach.
+     *
+     * @param mapping Column mapper.
+     * @param schema Target schema.
+     * @return Merged column mapper.
+     */
+    public static ColumnMapper mergeMapping(ColumnMapper mapping, 
SchemaDescriptor schema) {
+        if (mapping == identityMapping())
+            return schema.columnMapping();
+
+        else if (schema.columnMapping() == identityMapping())
+            return mapping;
+
+        ColumnMapperBuilder builder = mapperBuilder(schema.length());
+
+        ColumnMapper schemaMapper = schema.columnMapping();
+
+        for (int i = 0; i < schema.length(); i++) {
+            int idx = schemaMapper.map(i);
+
+            if (idx < 0)
+                builder.add(i, -1);
+            else
+                builder.add(i, mapping.map(idx));
+        }
+
+        return builder.build();
+    }
+
+    /**
      * Stub.
      */
     private ColumnMapping() {
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
index 10ee084..0dc23cd 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
@@ -17,12 +17,17 @@
 
 package org.apache.ignite.internal.schema.registry;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.function.Function;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Row;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.mapping.ColumnMapper;
+import org.apache.ignite.internal.schema.mapping.ColumnMapping;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -33,7 +38,10 @@ public class SchemaRegistryImpl implements SchemaRegistry {
     public static final int INITIAL_SCHEMA_VERSION = -1;
 
     /** Cached schemas. */
-    private final ConcurrentSkipListMap<Integer, SchemaDescriptor> schemaCache 
= new ConcurrentSkipListMap<>();
+    private final ConcurrentNavigableMap<Integer, SchemaDescriptor> 
schemaCache = new ConcurrentSkipListMap<>();
+
+    /** Column mappers cache. */
+    private final Map<Long, ColumnMapper> mappingCache = new 
ConcurrentHashMap<>();
 
     /** Last registered version. */
     private volatile int lastVer;
@@ -106,9 +114,37 @@ public class SchemaRegistryImpl implements SchemaRegistry {
         if (curSchema.version() == rowSchema.version())
             return new Row(rowSchema, row);
 
-        assert rowSchema.version() == curSchema.version() + 1; // TODO: 
IGNITE-14864 implement merged mapper for arbitraty schema versions.
+        ColumnMapper mapping = resolveMapping(curSchema, rowSchema);
+
+        return new UpgradingRowAdapter(curSchema, row, mapping);
+    }
+
+    /**
+     * @param curSchema Target schema.
+     * @param rowSchema Row schema.
+     * @return Column mapper for target schema.
+     */
+    ColumnMapper resolveMapping(SchemaDescriptor curSchema, SchemaDescriptor 
rowSchema) {
+        assert curSchema.version() > rowSchema.version();
+
+        if (curSchema.version() == rowSchema.version() + 1)
+            return curSchema.columnMapping();
+
+        final long mappingKey = (((long)curSchema.version()) << 32) | 
(rowSchema.version());
+
+        ColumnMapper mapping;
 
-        return new UpgradingRowAdapter(curSchema, row, 
rowSchema.columnMapping());
+        if ((mapping = mappingCache.get(mappingKey)) != null)
+            return mapping;
+
+        mapping = schema(rowSchema.version() + 1).columnMapping();
+
+        for (int i = rowSchema.version() + 2; i <= curSchema.version(); i++)
+            mapping = ColumnMapping.mergeMapping(mapping, schema(i));
+
+        mappingCache.putIfAbsent(mappingKey, mapping);
+
+        return mapping;
     }
 
     /**
@@ -145,6 +181,16 @@ public class SchemaRegistryImpl implements SchemaRegistry {
         if (ver >= lastVer || ver <= 0 || schemaCache.keySet().first() < ver)
             throw new SchemaRegistryException("Incorrect schema version to 
clean up to: " + ver);
 
-        schemaCache.remove(ver);
+        if (schemaCache.remove(ver) != null)
+            mappingCache.keySet().removeIf(k -> (k & 0xFFFF_FFFFL) == ver);
+    }
+
+    /**
+     * For test purposes only.
+     *
+     * @return ColumnMapping cache.
+     */
+    Map<Long, ColumnMapper> mappingCache() {
+        return mappingCache;
     }
 }
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
index 42cf7e6..7fd0167 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
@@ -29,7 +29,7 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
  */
 class UpgradingRowAdapter extends Row {
     /** Column mapper. */
-    private final ColumnMapper mapping;
+    private final ColumnMapper mapper;
 
     /**
      * @param schema Schema descriptor of new version.
@@ -39,12 +39,12 @@ class UpgradingRowAdapter extends Row {
     UpgradingRowAdapter(SchemaDescriptor schema, BinaryRow row, ColumnMapper 
mapper) {
         super(schema, row);
 
-        this.mapping = mapper;
+        this.mapper = mapper;
     }
 
     /** {@inheritDoc} */
     @Override protected long findColumn(int colIdx, NativeTypeSpec type) 
throws InvalidTypeException {
-        int mapIdx = mapping.map(colIdx);
+        int mapIdx = mapper.map(colIdx);
 
         return (mapIdx < 0) ? Long.MIN_VALUE : super.findColumn(mapIdx, type);
     }
diff --git 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaRegistryImplTest.java
 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
similarity index 90%
rename from 
modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaRegistryImplTest.java
rename to 
modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
index 9750aff..8d34981 100644
--- 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaRegistryImplTest.java
+++ 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
@@ -15,15 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.schema;
+package org.apache.ignite.internal.schema.registry;
 
 import java.util.Arrays;
 import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
-import 
org.apache.ignite.internal.schema.registry.SchemaRegistrationConflictException;
-import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
-import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.mapping.ColumnMapper;
 import org.junit.jupiter.api.Test;
 
 import static org.apache.ignite.internal.schema.NativeTypes.BYTES;
@@ -522,6 +523,63 @@ public class SchemaRegistryImplTest {
     }
 
     /**
+     * Check schema cache cleanup.
+     */
+    @Test
+    public void testSchemaCacheCleanup() {
+        UUID tableId = UUID.randomUUID();
+
+        final SchemaDescriptor schemaV1 = new SchemaDescriptor(tableId, 1,
+            new Column[]{new Column("keyLongCol", INT64, true)},
+            new Column[]{new Column("valBytesCol", BYTES, true)});
+
+        final SchemaDescriptor schemaV2 = new SchemaDescriptor(tableId, 2,
+            new Column[]{new Column("keyLongCol", INT64, true)},
+            new Column[]{
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaDescriptor schemaV3 = new SchemaDescriptor(tableId, 3,
+            new Column[]{new Column("keyLongCol", INT64, true)},
+            new Column[]{
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaDescriptor schemaV4 = new SchemaDescriptor(tableId, 4,
+            new Column[]{new Column("keyLongCol", INT64, true)},
+            new Column[]{
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null);
+
+        Map<Long, ColumnMapper> cache = reg.mappingCache();
+
+        reg.onSchemaRegistered(schemaV1);
+        reg.onSchemaRegistered(schemaV2);
+        reg.onSchemaRegistered(schemaV3);
+        reg.onSchemaRegistered(schemaV4);
+
+        assertEquals(0, cache.size());
+
+        reg.resolveMapping(schemaV4, schemaV1);
+        reg.resolveMapping(schemaV3, schemaV1);
+        reg.resolveMapping(schemaV4, schemaV2);
+
+        assertEquals(3, cache.size());
+
+        reg.onSchemaDropped(schemaV1.version());
+
+        assertEquals(1, cache.size());
+
+        reg.onSchemaDropped(schemaV2.version());
+
+        assertEquals(0, cache.size());
+    }
+
+    /**
      * @param history Table schema history.
      * @return Schema history map.
      */
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 939fd07..116d4ea 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -437,7 +437,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
             final int ver = tbl.schemaView().lastSchemaVersion() + 1;
 
             if (hasMetastorageLocally)
-                futs.add(schemaMgr.updateSchemaForTable(tblId, tblName, 
oldCfg.get(tblName), newCfg.get(tblName)));
+                futs.add(schemaMgr.updateSchemaForTable(tblId, 
oldCfg.get(tblName), newCfg.get(tblName)));
 
             final CompletableFuture<SchemaEventParameters> schemaReadyFut = 
new CompletableFuture<>();
 

Reply via email to