korlov42 commented on a change in pull request #35:
URL: https://github.com/apache/ignite-3/pull/35#discussion_r576221536



##########
File path: 
modules/commons/src/main/java/org/apache/ignite/internal/schema/NativeTypeSpec.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+/**
+ * Base class for storage built-in data types definition. The class contains 
predefined values
+ * for fixed-sized types and some of the variable-sized types. Parameterized 
types, such as
+ * bitmask of size <code>n</code> bits or number of max n bytes are created 
using static methods.
+ *
+ * An instance of native type provides necessary indirection to read any field 
as an instance of
+ * {@code java.lang.Object} to avoid switching inside the tuple methods.
+ */
+public enum NativeTypeSpec {
+    /**
+     * Native type representing a single-byte signed value.
+     */
+    BYTE("byte", true) {
+        /** {@inheritDoc} */
+        @Override public Object objectValue(Tuple tup, int colIdx) {
+            return tup.byteValueBoxed(colIdx);
+        }
+    },
+
+    /**
+     * Native type representing a two-bytes signed value.
+     */
+    SHORT("short", true) {
+        /** {@inheritDoc} */
+        @Override public Object objectValue(Tuple tup, int colIdx) {
+            return tup.shortValueBoxed(colIdx);
+        }
+    },
+
+    /**
+     * Native type representing a four-bytes signed value.
+     */
+    INTEGER("integer", true) {
+        /** {@inheritDoc} */
+        @Override public Object objectValue(Tuple tup, int colIdx) {
+            return tup.intValueBoxed(colIdx);
+        }
+    },
+
+    /**
+     * Native type representing an eight-bytes signed value.
+     */
+    LONG("long", true) {
+        /** {@inheritDoc} */
+        @Override public Object objectValue(Tuple tup, int colIdx) {
+            return tup.longValueBoxed(colIdx);
+        }
+    },
+
+    /**
+     * Native type representing a four-bytes floating-point value.
+     */
+    FLOAT("float", true) {
+        /** {@inheritDoc} */
+        @Override public Object objectValue(Tuple tup, int colIdx) {
+            return tup.floatValueBoxed(colIdx);
+        }
+    },
+
+    /**
+     * Native type representing an eight-bytes floating-point value.
+     */
+    DOUBLE("double", true) {
+        /** {@inheritDoc} */
+        @Override public Object objectValue(Tuple tup, int colIdx) {
+            return tup.doubleValueBoxed(colIdx);
+        }
+    },
+
+    /**
+     * Native type representing a UUID.
+     */
+    UUID("uuid", true) {
+        /** {@inheritDoc} */
+        @Override public Object objectValue(Tuple tup, int colIdx) {
+            return tup.uuidValue(colIdx);
+        }
+    },
+
+    /**
+     * Native type respresenting a string.
+     */
+    STRING("string") {
+        /** {@inheritDoc} */
+        @Override public Object objectValue(Tuple tup, int colIdx) {
+            return tup.stringValue(colIdx);
+        }
+    },
+
+    /**
+     * Native type representing an arbitrary byte array.
+     */
+    BYTES("blob") {
+        /** {@inheritDoc} */
+        @Override public Object objectValue(Tuple tup, int colIdx) {
+            return tup.bytesValue(colIdx);
+        }
+    },
+
+    /**
+     * Native type representing a bitmask.
+     */
+    BITMASK("bitmask", true) {
+        /** {@inheritDoc} */
+        @Override public Object objectValue(Tuple tup, int colIdx) {
+            return tup.bitmaskValue(colIdx);
+        }
+    };
+
+    /** Flag indicating whether this type specifies a fixed-length type. */
+    private final boolean fixedSize;
+
+    /** Single-token type description. */
+    private final String desc;
+
+    /**
+     * Constructs a varlength type with the given type description.
+     *
+     * @param desc Type description.
+     */
+    NativeTypeSpec(String desc) {
+        this(desc, false);
+    }
+
+    /**
+     * Constructs a type with the given description and size.
+     *
+     * @param desc Type description.
+     * @param fixedSize Flag indicating whether this type specifies a 
fixed-length type.
+     */
+    NativeTypeSpec(String desc, boolean fixedSize) {
+        this.desc = desc;
+        this.fixedSize = fixedSize;
+    }
+
+    /**
+     * @return {@code true} for fixed-length types, {@code false} otherwise.
+     */
+    public boolean fixedLength() {
+        return fixedSize;
+    }
+
+    /**
+     * Indirection method for getting an Object representation of the given 
type from the tuple. This method
+     * does do any type conversions and will throw an exception if tuple 
schema column type differs from this
+     * type.
+     *
+     * @param tup Tuple to read the value from.
+     * @param colIdx Column index to read.
+     * @return An Object representation of the value.
+     * @throws InvalidTypeException If this native type differs from the 
actual type of {@code colIdx}.
+     */
+    public abstract Object objectValue(Tuple tup, int colIdx) throws 
InvalidTypeException;
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "NativeType [desc=" + desc + ", size=" + (fixedLength() ? 
fixedSize : "varlen") + ']';

Review comment:
       ```suggestion
           return "NativeType [desc=" + desc + ", fixedSize=" + fixedSize + ']';
   ```

##########
File path: 
modules/commons/src/main/java/org/apache/ignite/internal/schema/Tuple.java
##########
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import java.util.BitSet;
+import java.util.UUID;
+
+/**
+ * The class contains non-generic methods to read boxed and unboxed primitives 
based on the schema column types.
+ * Any type conversions and coersions should be implemented outside of the 
tuple by the key-value or query runtime.
+ * When a non-boxed primitive is read from a null column value, it is 
converted to the primitive type default value.
+ */
+public abstract class Tuple {
+    /** */
+    public static final int SCHEMA_VERSION_FIELD_SIZE = 2;
+
+    /** */
+    public static final int KEY_HASH_FIELD_SIZE = 4;
+
+    /** */
+    public static final int TOTAL_LEN_FIELD_SIZE = 2;
+
+    /** */
+    public static final int VARSIZE_TABLE_LEN_FIELD_SIZE = 2;
+
+    /** Schema descriptor for which this tuple was created. */
+    private final SchemaDescriptor schema;
+
+    /**
+     * @param schema Schema instance.
+     */
+    protected Tuple(SchemaDescriptor schema) {
+        this.schema = schema;
+    }
+
+    /**
+     */
+    public byte byteValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.BYTE);
+
+        return off < 0 ? 0 : (byte)readByte(offset(off));
+    }
+
+    /**
+     */
+    public Byte byteValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.BYTE);
+
+        return off < 0 ? null : (byte)readByte(offset(off));
+    }
+
+    /**
+     */
+    public short shortValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.SHORT);
+
+        return off < 0 ? 0 : (short)readShort(offset(off));
+    }
+
+    /**
+     */
+    public Short shortValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.SHORT);
+
+        return off < 0 ? null : (short)readShort(offset(off));
+    }
+
+    /**
+     */
+    public int intValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.INTEGER);
+
+        return off < 0 ? 0 : readInteger(offset(off));
+    }
+
+    /**
+     */
+    public Integer intValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.INTEGER);
+
+        return off < 0 ? null : readInteger(offset(off));
+    }
+
+    /**
+     */
+    public long longValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.LONG);
+
+        return off < 0 ? 0 : readLong(offset(off));
+    }
+
+    /**
+     */
+    public Long longValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.LONG);
+
+        return off < 0 ? null : readLong(offset(off));
+    }
+
+    /**
+     */
+    public float floatValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.FLOAT);
+
+        return off < 0 ? 0.f : readFloat(offset(off));
+    }
+
+    /**
+     */
+    public Float floatValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.FLOAT);
+
+        return off < 0 ? null : readFloat(offset(off));
+    }
+
+    /**
+     */
+    public double doubleValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.DOUBLE);
+
+        return off < 0 ? 0.d : readDouble(offset(off));
+    }
+
+    /**
+     */
+    public Double doubleValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.DOUBLE);
+
+        return off < 0 ? null : readDouble(offset(off));
+    }
+
+    /**
+     */
+    public String stringValue(int col) {
+        long offLen = findColumn(col, NativeTypeSpec.STRING);
+
+        if (offLen < 0)
+            return null;
+
+        int off = offset(offLen);
+        int len = length(offLen);
+
+        return readString(off, len);
+    }
+
+    /**
+     */
+    public byte[] bytesValue(int col) {
+        long offLen = findColumn(col, NativeTypeSpec.BYTES);
+
+        if (offLen < 0)
+            return null;
+
+        int off = offset(offLen);
+        int len = length(offLen);
+
+        return readBytes(off, len);
+    }
+
+    /**
+     */
+    public UUID uuidValue(int col) {
+        long found = findColumn(col, NativeTypeSpec.UUID);
+
+        if (found < 0)
+            return null;
+
+        int off = offset(found);
+
+        long lsb = readLong(off);
+        long msb = readLong(off + 8);
+
+        return new UUID(msb, lsb);
+    }
+
+    /**
+     */
+    public BitSet bitmaskValue(int colIdx) {
+        long offLen = findColumn(colIdx, NativeTypeSpec.BITMASK);
+
+        if (offLen < 0)
+            return null;
+
+        int off = offset(offLen);
+
+        Column col = schema.column(colIdx);
+
+        return BitSet.valueOf(readBytes(off, col.type().length()));
+    }
+
+    /**
+     * Gets the column offset and length encoded into a single 8-byte value (4 
least significant bytes encoding the
+     * offset from the beginning of the tuple and 4 most significant bytes 
encoding the field length for varlength
+     * columns). The offset and length should be extracted using {@link 
#offset(long)} and {@link #length(long)}
+     * methods.
+     * Will also validate that the actual column type matches the requested 
column type, throwing
+     * {@link InvalidTypeException} if the types do not match.
+     *
+     * @param colIdx Column index.
+     * @param type Expected column type.
+     * @return Encoded offset + length of the column.
+     * @see #offset(long)
+     * @see #length(long)
+     * @see InvalidTypeException If actual column type does not match the 
requested column type.
+     */
+    private long findColumn(int colIdx, NativeTypeSpec type) {
+        // Get base offset (key start or value start) for the given column.
+        boolean keyCol = schema.keyColumn(colIdx);
+        Columns cols = schema.columns(colIdx);
+
+        int off = SCHEMA_VERSION_FIELD_SIZE + KEY_HASH_FIELD_SIZE;
+
+        if (!keyCol) {
+            // Jump to the next chunk, the size of the first chunk is written 
at the chunk start.
+            off += readShort(off);
+
+            // Adjust the column index according to the number of key columns.
+            colIdx -= schema.keyColumns().length();
+        }
+
+        Column col = cols.column(colIdx);
+
+        if (col.type().spec() != type)
+            throw new InvalidTypeException("Invalid column type requested 
[requested=" + type +
+                ", column=" + col + ']');
+
+        if (isNull(off, colIdx))
+            return -1;
+
+        return type.fixedLength() ?
+            fixlenColumnOffset(cols, off, colIdx) :
+            varlenColumnOffsetAndLength(cols, off, colIdx);
+    }
+
+    /**
+     * Checks the typle null map for the given column index in the chunk.
+     *
+     * @param baseOff Offset of the chunk start in the tuple.
+     * @param idx Offset of the column in the chunk.
+     * @return {@code true} if the column value is {@code null}.
+     */
+    private boolean isNull(int baseOff, int idx) {
+        int nullMapOff = nullMapOffset(baseOff);
+
+        int nullByte = idx / 8;
+        int posInByte = idx % 8;
+
+        int map = readByte(nullMapOff + nullByte);
+
+        return (map & (1 << posInByte)) != 0;
+    }
+
+    /**
+     * Utility method to extract the column offset from the {@link 
#findColumn(int, NativeTypeSpec)} result. The
+     * offset is calculated from the beginning of the tuple.
+     *
+     * @param offLen {@code findColumn} invocation result.
+     * @return Column offset from the beginning of the tuple.
+     */
+    private static int offset(long offLen) {
+        return (int)offLen;
+    }
+
+    /**
+     * Utility method to extract the column length from the {@link 
#findColumn(int, NativeTypeSpec)} result for
+     * varlength columns.
+     *
+     * @param offLen {@code findColumn} invocation result.
+     * @return Length of the column or {@code 0} if the column is fixed-length.
+     */
+    private static int length(long offLen) {
+        return (int)(offLen >>> 32);
+    }
+
+    /**
+     * Calculates the offset and length of varlen column. First, it calculates 
the number of non-null columns
+     * preceeding the requested column by folding the null map bits. This 
number is used to adjust the column index
+     * and find the corresponding entry in the varlen table. The length of the 
column is calculated either by
+     * subtracting two adjacent varlen table offsets, or by subtracting the 
last varlen table offset from the chunk
+     * length.
+     *
+     * @param cols Columns chunk.
+     * @param baseOff Chunk base offset.
+     * @param idx Column index in the chunk.
+     * @return Encoded offset (from the tuple start) and length of the column 
with the given index.
+     */
+    private long varlenColumnOffsetAndLength(Columns cols, int baseOff, int 
idx) {
+        int nullMapOff = nullMapOffset(baseOff);
+
+        int nullStartByte = cols.firstVarlengthColumn() / 8;
+        int startBitInByte = cols.firstVarlengthColumn() % 8;
+
+        int nullEndByte = idx / 8;
+        int endBitInByte = idx % 8;
+        int numNullsBefore = 0;
+
+        for (int i = nullStartByte; i <= nullEndByte; i++) {
+            int nullmapByte = readByte(nullMapOff + i);
+
+            if (i == nullStartByte)
+                // We need to clear startBitInByte least significant bits
+                nullmapByte &= (0xFF << startBitInByte);
+
+            if (i == nullEndByte)
+                // We need to clear 8-endBitInByte most significant bits
+                nullmapByte &= (0xFF >> (8 - endBitInByte));
+
+            numNullsBefore += Columns.numberOfNullColumns(nullmapByte);
+        }
+
+        idx -= cols.numberOfFixsizeColumns() + numNullsBefore;
+        int vartableSize = readShort(baseOff + TOTAL_LEN_FIELD_SIZE);
+
+        int vartableOff = vartableOffset(baseOff);
+        // Offset of idx-th column is from base offset.
+        int resOff = readShort(vartableOff + 2 * idx);
+
+        long len = idx == vartableSize - 1 ?
+            // totalLength - columnStartOffset
+            readShort(baseOff) - resOff :
+            // nextColumnStartOffset - columnStartOffset
+            readShort(vartableOff + 2 * (idx + 1)) - resOff;
+
+        return (len << 32) | (resOff + baseOff);
+    }
+
+    /**
+     * Calculates the offset of the fixlen column with the given index in the 
tuple. It essentially folds the null map
+     * with the column lengths to calculate the size of non-null columns 
preceeding the requested column.
+     *
+     * @param cols Columns chunk.
+     * @param baseOff Chunk base offset.
+     * @param idx Column index in the chunk.
+     * @return Encoded offset (from the tuple start) of the requested fixlen 
column.
+     */
+    int fixlenColumnOffset(Columns cols, int baseOff, int idx) {
+        int nullMapOff = nullMapOffset(baseOff);
+
+        int off = 0;
+        int nullMapIdx = idx / 8;
+
+        // Fold offset based on the whole map bytes in the schema
+        for (int i = 0; i < nullMapIdx; i++)
+            off += cols.foldFixedLength(i, readByte(nullMapOff + i));
+
+        // Set bits starting from posInByte, inclusive, up to either the end 
of the byte or the last column index, inclusive
+        int startBit = idx % 8;
+        int endBit = nullMapIdx == cols.nullMapSize() - 1 ? 
((cols.numberOfFixsizeColumns() - 1) % 8) : 7;
+        int mask = (0xFF >> (7 - endBit)) & (0xFF << startBit);
+
+        off += cols.foldFixedLength(nullMapIdx, readByte(nullMapOff + 
nullMapIdx) | mask);
+
+        return nullMapOff + cols.nullMapSize() + off;
+    }
+
+    /**
+     * @param baseOff Chunk base offset.
+     * @return Null map offset from the tuple start for the chunk with the 
given base.
+     */
+    private int nullMapOffset(int baseOff) {
+        int varlenTblLen = readShort(baseOff + TOTAL_LEN_FIELD_SIZE) * 2;
+
+        return vartableOffset(baseOff) + varlenTblLen;
+    }
+
+    /**
+     * @param baseOff Chunk base offset.
+     * @return Offset of the varlen table from the tuple start for the chunk 
with the given base.
+     */
+    private int vartableOffset(int baseOff) {
+        return baseOff + TOTAL_LEN_FIELD_SIZE + VARSIZE_TABLE_LEN_FIELD_SIZE;
+    }
+
+    /**
+     */
+    protected abstract int readByte(int off);

Review comment:
       Why do both `readByte` and `readShort` methods returns `int`?

##########
File path: 
modules/commons/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+/**
+ * Full schema descriptor containing key columns chunk, value columns chunk, 
and schema version.
+ */
+public class SchemaDescriptor {
+    /** Schema version. Incremented on each schema modification. */
+    private final int ver;
+
+    /**
+     * Key columns in serialization order.
+     */
+    private final Columns keyCols;
+
+    /**
+     * Value columns in serialization order.
+     */
+    private final Columns valCols;
+
+    /**
+     * @param ver Schema version.
+     * @param keyCols Key columns.
+     * @param valCols Value columns.
+     */
+    public SchemaDescriptor(int ver, Columns keyCols, Columns valCols) {
+        this.ver = ver;
+        this.keyCols = keyCols;
+        this.valCols = valCols;
+    }
+
+    /**
+     * @return Schema version.
+     */
+    public int version() {
+        return ver;
+    }
+
+    /**
+     * @param idx Index to check.
+     * @return {@code true} if the column belongs to the key chunk.
+     */
+    public boolean keyColumn(int idx) {
+        return idx < keyCols.length();
+    }
+
+    /**
+     * @param col Column index.
+     * @return Column chunk for the given column index.
+     */
+    public Columns columns(int col) {

Review comment:
       I don't sure this method should be a part of `SchemaDescriptor.` Looks 
like all invocations could be simply covered with direct call to `keyColumns()` 
and `valueColumns()`.
   
   BTW is it possible to have a schema with empty set of key's columns and 
non-empty set of value's columns?

##########
File path: 
modules/commons/src/main/java/org/apache/ignite/internal/schema/Tuple.java
##########
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import java.util.BitSet;
+import java.util.UUID;
+
+/**
+ * The class contains non-generic methods to read boxed and unboxed primitives 
based on the schema column types.
+ * Any type conversions and coersions should be implemented outside of the 
tuple by the key-value or query runtime.
+ * When a non-boxed primitive is read from a null column value, it is 
converted to the primitive type default value.
+ */
+public abstract class Tuple {
+    /** */
+    public static final int SCHEMA_VERSION_FIELD_SIZE = 2;
+
+    /** */
+    public static final int KEY_HASH_FIELD_SIZE = 4;
+
+    /** */
+    public static final int TOTAL_LEN_FIELD_SIZE = 2;
+
+    /** */
+    public static final int VARSIZE_TABLE_LEN_FIELD_SIZE = 2;
+
+    /** Schema descriptor for which this tuple was created. */
+    private final SchemaDescriptor schema;
+
+    /**
+     * @param schema Schema instance.
+     */
+    protected Tuple(SchemaDescriptor schema) {
+        this.schema = schema;
+    }
+
+    /**
+     */
+    public byte byteValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.BYTE);
+
+        return off < 0 ? 0 : (byte)readByte(offset(off));
+    }
+
+    /**
+     */
+    public Byte byteValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.BYTE);
+
+        return off < 0 ? null : (byte)readByte(offset(off));
+    }
+
+    /**
+     */
+    public short shortValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.SHORT);
+
+        return off < 0 ? 0 : (short)readShort(offset(off));
+    }
+
+    /**
+     */
+    public Short shortValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.SHORT);
+
+        return off < 0 ? null : (short)readShort(offset(off));
+    }
+
+    /**
+     */
+    public int intValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.INTEGER);
+
+        return off < 0 ? 0 : readInteger(offset(off));
+    }
+
+    /**
+     */
+    public Integer intValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.INTEGER);
+
+        return off < 0 ? null : readInteger(offset(off));
+    }
+
+    /**
+     */
+    public long longValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.LONG);
+
+        return off < 0 ? 0 : readLong(offset(off));
+    }
+
+    /**
+     */
+    public Long longValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.LONG);
+
+        return off < 0 ? null : readLong(offset(off));
+    }
+
+    /**
+     */
+    public float floatValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.FLOAT);
+
+        return off < 0 ? 0.f : readFloat(offset(off));
+    }
+
+    /**
+     */
+    public Float floatValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.FLOAT);
+
+        return off < 0 ? null : readFloat(offset(off));
+    }
+
+    /**
+     */
+    public double doubleValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.DOUBLE);
+
+        return off < 0 ? 0.d : readDouble(offset(off));
+    }
+
+    /**
+     */
+    public Double doubleValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.DOUBLE);
+
+        return off < 0 ? null : readDouble(offset(off));
+    }
+
+    /**
+     */
+    public String stringValue(int col) {
+        long offLen = findColumn(col, NativeTypeSpec.STRING);
+
+        if (offLen < 0)
+            return null;
+
+        int off = offset(offLen);
+        int len = length(offLen);
+
+        return readString(off, len);
+    }
+
+    /**
+     */
+    public byte[] bytesValue(int col) {
+        long offLen = findColumn(col, NativeTypeSpec.BYTES);
+
+        if (offLen < 0)
+            return null;
+
+        int off = offset(offLen);
+        int len = length(offLen);
+
+        return readBytes(off, len);
+    }
+
+    /**
+     */
+    public UUID uuidValue(int col) {
+        long found = findColumn(col, NativeTypeSpec.UUID);
+
+        if (found < 0)
+            return null;
+
+        int off = offset(found);
+
+        long lsb = readLong(off);
+        long msb = readLong(off + 8);
+
+        return new UUID(msb, lsb);
+    }
+
+    /**
+     */
+    public BitSet bitmaskValue(int colIdx) {
+        long offLen = findColumn(colIdx, NativeTypeSpec.BITMASK);
+
+        if (offLen < 0)
+            return null;
+
+        int off = offset(offLen);
+
+        Column col = schema.column(colIdx);
+
+        return BitSet.valueOf(readBytes(off, col.type().length()));
+    }
+
+    /**
+     * Gets the column offset and length encoded into a single 8-byte value (4 
least significant bytes encoding the
+     * offset from the beginning of the tuple and 4 most significant bytes 
encoding the field length for varlength
+     * columns). The offset and length should be extracted using {@link 
#offset(long)} and {@link #length(long)}
+     * methods.
+     * Will also validate that the actual column type matches the requested 
column type, throwing
+     * {@link InvalidTypeException} if the types do not match.
+     *
+     * @param colIdx Column index.
+     * @param type Expected column type.
+     * @return Encoded offset + length of the column.
+     * @see #offset(long)
+     * @see #length(long)
+     * @see InvalidTypeException If actual column type does not match the 
requested column type.
+     */
+    private long findColumn(int colIdx, NativeTypeSpec type) {
+        // Get base offset (key start or value start) for the given column.
+        boolean keyCol = schema.keyColumn(colIdx);
+        Columns cols = schema.columns(colIdx);
+
+        int off = SCHEMA_VERSION_FIELD_SIZE + KEY_HASH_FIELD_SIZE;
+
+        if (!keyCol) {
+            // Jump to the next chunk, the size of the first chunk is written 
at the chunk start.
+            off += readShort(off);
+
+            // Adjust the column index according to the number of key columns.
+            colIdx -= schema.keyColumns().length();
+        }
+
+        Column col = cols.column(colIdx);
+
+        if (col.type().spec() != type)
+            throw new InvalidTypeException("Invalid column type requested 
[requested=" + type +
+                ", column=" + col + ']');
+
+        if (isNull(off, colIdx))
+            return -1;
+
+        return type.fixedLength() ?
+            fixlenColumnOffset(cols, off, colIdx) :
+            varlenColumnOffsetAndLength(cols, off, colIdx);
+    }
+
+    /**
+     * Checks the typle null map for the given column index in the chunk.
+     *
+     * @param baseOff Offset of the chunk start in the tuple.
+     * @param idx Offset of the column in the chunk.
+     * @return {@code true} if the column value is {@code null}.
+     */
+    private boolean isNull(int baseOff, int idx) {
+        int nullMapOff = nullMapOffset(baseOff);
+
+        int nullByte = idx / 8;
+        int posInByte = idx % 8;
+
+        int map = readByte(nullMapOff + nullByte);
+
+        return (map & (1 << posInByte)) != 0;
+    }
+
+    /**
+     * Utility method to extract the column offset from the {@link 
#findColumn(int, NativeTypeSpec)} result. The
+     * offset is calculated from the beginning of the tuple.
+     *
+     * @param offLen {@code findColumn} invocation result.
+     * @return Column offset from the beginning of the tuple.
+     */
+    private static int offset(long offLen) {
+        return (int)offLen;
+    }
+
+    /**
+     * Utility method to extract the column length from the {@link 
#findColumn(int, NativeTypeSpec)} result for
+     * varlength columns.
+     *
+     * @param offLen {@code findColumn} invocation result.
+     * @return Length of the column or {@code 0} if the column is fixed-length.
+     */
+    private static int length(long offLen) {
+        return (int)(offLen >>> 32);
+    }
+
+    /**
+     * Calculates the offset and length of varlen column. First, it calculates 
the number of non-null columns
+     * preceeding the requested column by folding the null map bits. This 
number is used to adjust the column index
+     * and find the corresponding entry in the varlen table. The length of the 
column is calculated either by
+     * subtracting two adjacent varlen table offsets, or by subtracting the 
last varlen table offset from the chunk
+     * length.
+     *
+     * @param cols Columns chunk.
+     * @param baseOff Chunk base offset.
+     * @param idx Column index in the chunk.
+     * @return Encoded offset (from the tuple start) and length of the column 
with the given index.
+     */
+    private long varlenColumnOffsetAndLength(Columns cols, int baseOff, int 
idx) {
+        int nullMapOff = nullMapOffset(baseOff);
+
+        int nullStartByte = cols.firstVarlengthColumn() / 8;
+        int startBitInByte = cols.firstVarlengthColumn() % 8;
+
+        int nullEndByte = idx / 8;
+        int endBitInByte = idx % 8;
+        int numNullsBefore = 0;
+
+        for (int i = nullStartByte; i <= nullEndByte; i++) {
+            int nullmapByte = readByte(nullMapOff + i);
+
+            if (i == nullStartByte)
+                // We need to clear startBitInByte least significant bits
+                nullmapByte &= (0xFF << startBitInByte);
+
+            if (i == nullEndByte)
+                // We need to clear 8-endBitInByte most significant bits
+                nullmapByte &= (0xFF >> (8 - endBitInByte));
+
+            numNullsBefore += Columns.numberOfNullColumns(nullmapByte);
+        }
+
+        idx -= cols.numberOfFixsizeColumns() + numNullsBefore;
+        int vartableSize = readShort(baseOff + TOTAL_LEN_FIELD_SIZE);
+
+        int vartableOff = vartableOffset(baseOff);
+        // Offset of idx-th column is from base offset.
+        int resOff = readShort(vartableOff + 2 * idx);
+
+        long len = idx == vartableSize - 1 ?
+            // totalLength - columnStartOffset
+            readShort(baseOff) - resOff :
+            // nextColumnStartOffset - columnStartOffset
+            readShort(vartableOff + 2 * (idx + 1)) - resOff;
+
+        return (len << 32) | (resOff + baseOff);
+    }
+
+    /**
+     * Calculates the offset of the fixlen column with the given index in the 
tuple. It essentially folds the null map
+     * with the column lengths to calculate the size of non-null columns 
preceeding the requested column.

Review comment:
       ```suggestion
        * with the column lengths to calculate the size of non-null columns 
preceding the requested column.
   ```

##########
File path: 
modules/commons/src/main/java/org/apache/ignite/internal/schema/Tuple.java
##########
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import java.util.BitSet;
+import java.util.UUID;
+
+/**
+ * The class contains non-generic methods to read boxed and unboxed primitives 
based on the schema column types.
+ * Any type conversions and coersions should be implemented outside of the 
tuple by the key-value or query runtime.
+ * When a non-boxed primitive is read from a null column value, it is 
converted to the primitive type default value.
+ */
+public abstract class Tuple {
+    /** */
+    public static final int SCHEMA_VERSION_FIELD_SIZE = 2;
+
+    /** */
+    public static final int KEY_HASH_FIELD_SIZE = 4;
+
+    /** */
+    public static final int TOTAL_LEN_FIELD_SIZE = 2;
+
+    /** */
+    public static final int VARSIZE_TABLE_LEN_FIELD_SIZE = 2;
+
+    /** Schema descriptor for which this tuple was created. */
+    private final SchemaDescriptor schema;
+
+    /**
+     * @param schema Schema instance.
+     */
+    protected Tuple(SchemaDescriptor schema) {
+        this.schema = schema;
+    }
+
+    /**
+     */
+    public byte byteValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.BYTE);
+
+        return off < 0 ? 0 : (byte)readByte(offset(off));
+    }
+
+    /**
+     */
+    public Byte byteValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.BYTE);
+
+        return off < 0 ? null : (byte)readByte(offset(off));
+    }
+
+    /**
+     */
+    public short shortValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.SHORT);
+
+        return off < 0 ? 0 : (short)readShort(offset(off));
+    }
+
+    /**
+     */
+    public Short shortValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.SHORT);
+
+        return off < 0 ? null : (short)readShort(offset(off));
+    }
+
+    /**
+     */
+    public int intValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.INTEGER);
+
+        return off < 0 ? 0 : readInteger(offset(off));
+    }
+
+    /**
+     */
+    public Integer intValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.INTEGER);
+
+        return off < 0 ? null : readInteger(offset(off));
+    }
+
+    /**
+     */
+    public long longValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.LONG);
+
+        return off < 0 ? 0 : readLong(offset(off));
+    }
+
+    /**
+     */
+    public Long longValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.LONG);
+
+        return off < 0 ? null : readLong(offset(off));
+    }
+
+    /**
+     */
+    public float floatValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.FLOAT);
+
+        return off < 0 ? 0.f : readFloat(offset(off));
+    }
+
+    /**
+     */
+    public Float floatValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.FLOAT);
+
+        return off < 0 ? null : readFloat(offset(off));
+    }
+
+    /**
+     */
+    public double doubleValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.DOUBLE);
+
+        return off < 0 ? 0.d : readDouble(offset(off));
+    }
+
+    /**
+     */
+    public Double doubleValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.DOUBLE);
+
+        return off < 0 ? null : readDouble(offset(off));
+    }
+
+    /**
+     */
+    public String stringValue(int col) {
+        long offLen = findColumn(col, NativeTypeSpec.STRING);
+
+        if (offLen < 0)
+            return null;
+
+        int off = offset(offLen);
+        int len = length(offLen);
+
+        return readString(off, len);
+    }
+
+    /**
+     */
+    public byte[] bytesValue(int col) {
+        long offLen = findColumn(col, NativeTypeSpec.BYTES);
+
+        if (offLen < 0)
+            return null;
+
+        int off = offset(offLen);
+        int len = length(offLen);
+
+        return readBytes(off, len);
+    }
+
+    /**
+     */
+    public UUID uuidValue(int col) {
+        long found = findColumn(col, NativeTypeSpec.UUID);
+
+        if (found < 0)
+            return null;
+
+        int off = offset(found);
+
+        long lsb = readLong(off);
+        long msb = readLong(off + 8);
+
+        return new UUID(msb, lsb);
+    }
+
+    /**
+     */
+    public BitSet bitmaskValue(int colIdx) {
+        long offLen = findColumn(colIdx, NativeTypeSpec.BITMASK);
+
+        if (offLen < 0)
+            return null;
+
+        int off = offset(offLen);
+
+        Column col = schema.column(colIdx);
+
+        return BitSet.valueOf(readBytes(off, col.type().length()));
+    }
+
+    /**
+     * Gets the column offset and length encoded into a single 8-byte value (4 
least significant bytes encoding the
+     * offset from the beginning of the tuple and 4 most significant bytes 
encoding the field length for varlength
+     * columns). The offset and length should be extracted using {@link 
#offset(long)} and {@link #length(long)}
+     * methods.
+     * Will also validate that the actual column type matches the requested 
column type, throwing
+     * {@link InvalidTypeException} if the types do not match.
+     *
+     * @param colIdx Column index.
+     * @param type Expected column type.
+     * @return Encoded offset + length of the column.
+     * @see #offset(long)
+     * @see #length(long)
+     * @see InvalidTypeException If actual column type does not match the 
requested column type.
+     */
+    private long findColumn(int colIdx, NativeTypeSpec type) {
+        // Get base offset (key start or value start) for the given column.
+        boolean keyCol = schema.keyColumn(colIdx);
+        Columns cols = schema.columns(colIdx);
+
+        int off = SCHEMA_VERSION_FIELD_SIZE + KEY_HASH_FIELD_SIZE;
+
+        if (!keyCol) {
+            // Jump to the next chunk, the size of the first chunk is written 
at the chunk start.
+            off += readShort(off);
+
+            // Adjust the column index according to the number of key columns.
+            colIdx -= schema.keyColumns().length();
+        }
+
+        Column col = cols.column(colIdx);
+
+        if (col.type().spec() != type)
+            throw new InvalidTypeException("Invalid column type requested 
[requested=" + type +
+                ", column=" + col + ']');
+
+        if (isNull(off, colIdx))
+            return -1;
+
+        return type.fixedLength() ?
+            fixlenColumnOffset(cols, off, colIdx) :
+            varlenColumnOffsetAndLength(cols, off, colIdx);
+    }
+
+    /**
+     * Checks the typle null map for the given column index in the chunk.
+     *
+     * @param baseOff Offset of the chunk start in the tuple.
+     * @param idx Offset of the column in the chunk.
+     * @return {@code true} if the column value is {@code null}.
+     */
+    private boolean isNull(int baseOff, int idx) {
+        int nullMapOff = nullMapOffset(baseOff);
+
+        int nullByte = idx / 8;
+        int posInByte = idx % 8;
+
+        int map = readByte(nullMapOff + nullByte);
+
+        return (map & (1 << posInByte)) != 0;
+    }
+
+    /**
+     * Utility method to extract the column offset from the {@link 
#findColumn(int, NativeTypeSpec)} result. The
+     * offset is calculated from the beginning of the tuple.
+     *
+     * @param offLen {@code findColumn} invocation result.
+     * @return Column offset from the beginning of the tuple.
+     */
+    private static int offset(long offLen) {
+        return (int)offLen;
+    }
+
+    /**
+     * Utility method to extract the column length from the {@link 
#findColumn(int, NativeTypeSpec)} result for
+     * varlength columns.
+     *
+     * @param offLen {@code findColumn} invocation result.
+     * @return Length of the column or {@code 0} if the column is fixed-length.
+     */
+    private static int length(long offLen) {
+        return (int)(offLen >>> 32);
+    }
+
+    /**
+     * Calculates the offset and length of varlen column. First, it calculates 
the number of non-null columns
+     * preceeding the requested column by folding the null map bits. This 
number is used to adjust the column index

Review comment:
       ```suggestion
        * preceding the requested column by folding the null map bits. This 
number is used to adjust the column index
   ```

##########
File path: 
modules/commons/src/main/java/org/apache/ignite/internal/schema/Tuple.java
##########
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import java.util.BitSet;
+import java.util.UUID;
+
+/**
+ * The class contains non-generic methods to read boxed and unboxed primitives 
based on the schema column types.
+ * Any type conversions and coersions should be implemented outside of the 
tuple by the key-value or query runtime.
+ * When a non-boxed primitive is read from a null column value, it is 
converted to the primitive type default value.
+ */
+public abstract class Tuple {
+    /** */
+    public static final int SCHEMA_VERSION_FIELD_SIZE = 2;
+
+    /** */
+    public static final int KEY_HASH_FIELD_SIZE = 4;
+
+    /** */
+    public static final int TOTAL_LEN_FIELD_SIZE = 2;
+
+    /** */
+    public static final int VARSIZE_TABLE_LEN_FIELD_SIZE = 2;
+
+    /** Schema descriptor for which this tuple was created. */
+    private final SchemaDescriptor schema;
+
+    /**
+     * @param schema Schema instance.
+     */
+    protected Tuple(SchemaDescriptor schema) {
+        this.schema = schema;
+    }
+
+    /**
+     */
+    public byte byteValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.BYTE);
+
+        return off < 0 ? 0 : (byte)readByte(offset(off));
+    }
+
+    /**
+     */
+    public Byte byteValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.BYTE);
+
+        return off < 0 ? null : (byte)readByte(offset(off));
+    }
+
+    /**
+     */
+    public short shortValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.SHORT);
+
+        return off < 0 ? 0 : (short)readShort(offset(off));
+    }
+
+    /**
+     */
+    public Short shortValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.SHORT);
+
+        return off < 0 ? null : (short)readShort(offset(off));
+    }
+
+    /**
+     */
+    public int intValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.INTEGER);
+
+        return off < 0 ? 0 : readInteger(offset(off));
+    }
+
+    /**
+     */
+    public Integer intValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.INTEGER);
+
+        return off < 0 ? null : readInteger(offset(off));
+    }
+
+    /**
+     */
+    public long longValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.LONG);
+
+        return off < 0 ? 0 : readLong(offset(off));
+    }
+
+    /**
+     */
+    public Long longValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.LONG);
+
+        return off < 0 ? null : readLong(offset(off));
+    }
+
+    /**
+     */
+    public float floatValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.FLOAT);
+
+        return off < 0 ? 0.f : readFloat(offset(off));
+    }
+
+    /**
+     */
+    public Float floatValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.FLOAT);
+
+        return off < 0 ? null : readFloat(offset(off));
+    }
+
+    /**
+     */
+    public double doubleValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.DOUBLE);
+
+        return off < 0 ? 0.d : readDouble(offset(off));
+    }
+
+    /**
+     */
+    public Double doubleValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.DOUBLE);
+
+        return off < 0 ? null : readDouble(offset(off));
+    }
+
+    /**
+     */
+    public String stringValue(int col) {
+        long offLen = findColumn(col, NativeTypeSpec.STRING);
+
+        if (offLen < 0)
+            return null;
+
+        int off = offset(offLen);
+        int len = length(offLen);
+
+        return readString(off, len);
+    }
+
+    /**
+     */
+    public byte[] bytesValue(int col) {
+        long offLen = findColumn(col, NativeTypeSpec.BYTES);
+
+        if (offLen < 0)
+            return null;
+
+        int off = offset(offLen);
+        int len = length(offLen);
+
+        return readBytes(off, len);
+    }
+
+    /**
+     */
+    public UUID uuidValue(int col) {
+        long found = findColumn(col, NativeTypeSpec.UUID);
+
+        if (found < 0)
+            return null;
+
+        int off = offset(found);
+
+        long lsb = readLong(off);
+        long msb = readLong(off + 8);
+
+        return new UUID(msb, lsb);
+    }
+
+    /**
+     */
+    public BitSet bitmaskValue(int colIdx) {
+        long offLen = findColumn(colIdx, NativeTypeSpec.BITMASK);
+
+        if (offLen < 0)
+            return null;
+
+        int off = offset(offLen);
+
+        Column col = schema.column(colIdx);
+
+        return BitSet.valueOf(readBytes(off, col.type().length()));
+    }
+
+    /**
+     * Gets the column offset and length encoded into a single 8-byte value (4 
least significant bytes encoding the
+     * offset from the beginning of the tuple and 4 most significant bytes 
encoding the field length for varlength
+     * columns). The offset and length should be extracted using {@link 
#offset(long)} and {@link #length(long)}
+     * methods.
+     * Will also validate that the actual column type matches the requested 
column type, throwing
+     * {@link InvalidTypeException} if the types do not match.
+     *
+     * @param colIdx Column index.
+     * @param type Expected column type.
+     * @return Encoded offset + length of the column.
+     * @see #offset(long)
+     * @see #length(long)
+     * @see InvalidTypeException If actual column type does not match the 
requested column type.
+     */
+    private long findColumn(int colIdx, NativeTypeSpec type) {
+        // Get base offset (key start or value start) for the given column.
+        boolean keyCol = schema.keyColumn(colIdx);
+        Columns cols = schema.columns(colIdx);
+
+        int off = SCHEMA_VERSION_FIELD_SIZE + KEY_HASH_FIELD_SIZE;
+
+        if (!keyCol) {
+            // Jump to the next chunk, the size of the first chunk is written 
at the chunk start.
+            off += readShort(off);
+
+            // Adjust the column index according to the number of key columns.
+            colIdx -= schema.keyColumns().length();
+        }
+
+        Column col = cols.column(colIdx);
+
+        if (col.type().spec() != type)
+            throw new InvalidTypeException("Invalid column type requested 
[requested=" + type +
+                ", column=" + col + ']');
+
+        if (isNull(off, colIdx))
+            return -1;
+
+        return type.fixedLength() ?
+            fixlenColumnOffset(cols, off, colIdx) :
+            varlenColumnOffsetAndLength(cols, off, colIdx);
+    }
+
+    /**
+     * Checks the typle null map for the given column index in the chunk.

Review comment:
       ```suggestion
        * Checks the tuple's null map for the given column index in the chunk.
   ```

##########
File path: 
modules/commons/src/main/java/org/apache/ignite/internal/schema/NativeType.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+/**
+ * A thin wrapper over {@link NativeTypeSpec} to instantiate parameterized 
constrained types.
+ */
+public class NativeType implements Comparable<NativeType> {
+    /** */
+    public static final NativeType BYTE = new NativeType(NativeTypeSpec.BYTE, 
1);
+
+    /** */
+    public static final NativeType SHORT = new 
NativeType(NativeTypeSpec.SHORT, 2);
+
+    /** */
+    public static final NativeType INTEGER = new 
NativeType(NativeTypeSpec.INTEGER, 4);
+
+    /** */
+    public static final NativeType LONG = new NativeType(NativeTypeSpec.LONG, 
8);
+
+    /** */
+    public static final NativeType FLOAT = new 
NativeType(NativeTypeSpec.FLOAT, 4);
+
+    /** */
+    public static final NativeType DOUBLE = new 
NativeType(NativeTypeSpec.DOUBLE, 8);
+
+    /** */
+    public static final NativeType UUID = new NativeType(NativeTypeSpec.UUID, 
16);
+
+    /** */
+    public static final NativeType STRING = new 
NativeType(NativeTypeSpec.STRING);
+
+    /** */
+    public static final NativeType BYTES = new 
NativeType(NativeTypeSpec.BYTES);
+
+    /** */
+    private final NativeTypeSpec typeSpec;
+
+    /** Type length. */
+    private int len;

Review comment:
       it's better to make it final

##########
File path: 
modules/commons/src/main/java/org/apache/ignite/internal/schema/Columns.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import java.util.Arrays;
+import java.util.NoSuchElementException;
+
+/**
+ * A set of columns representing a key or a value chunk in tuple. Instance of 
Columns provides necessary machinery
+ * to locate a column value in a concrete tuple.
+ */
+public class Columns {
+    /** */
+    public static final int[][] EMPTY_FOLDING_TABLE = new int[0][];
+
+    /** */
+    public static final int[] EMPTY_FOLDING_MASK = new int[0];
+
+    /**
+     * Lookup table to speed-up calculation of the number of null/non-null 
columns based on the null map.
+     * For a given byte {@code b}, {@code NULL_COLUMNS_LOOKUP[b]} will contain 
the number of {@code null} columns
+     * corresponding to the byte in nullability map.
+     * For example, if nullability map is {@code 0b00100001}, then the map 
encodes nulls for columns 0 and 5 and
+     * {@code NULL_COLUMNS_LOOKUP[0b00100001] == 2}.
+     */
+    private static final int[] NULL_COLUMNS_LOOKUP;
+
+    /**
+     * Columns in packed order for this chunk.
+     */
+    private final Column[] cols;
+
+    /**
+     * If the type contains varlength columns, this field will contain an 
index of the first such column.
+     * Otherwise, it will contain {@code -1}.
+     */
+    private final int firstVarlenColIdx;
+
+    /**
+     * Number of bytes required to store the nullability map for this chunk.
+     */
+    private final int nullMapSize;
+
+    /**
+     * Fixed-size column length folding table. The table is used to quickly 
calculate the offset of a fixed-length
+     * column based on the nullability map.
+     */
+    private int[][] foldingTbl;
+
+    /**
+     * Additional mask values for folding table to cut off nullability map for 
columns with larger indexes.
+     */
+    private int[] foldingMask;
+
+    static {
+        NULL_COLUMNS_LOOKUP = new int[256];
+
+        // Each nonzero bit is a null value.
+        for (int i = 0; i < 255; i++)
+            NULL_COLUMNS_LOOKUP[i] = Integer.bitCount(i);
+    }
+
+    /**
+     * Gets a number of null columns for the given byte from the nullability 
map (essentially, the number of non-zero
+     * bits in the given byte).
+     *
+     * @param nullMapByte Byte from a nullability map.
+     * @return Number of null columns for the given byte.
+     */
+    public static int numberOfNullColumns(int nullMapByte) {

Review comment:
       I would prefer to have a byte value as `byte` type.

##########
File path: 
modules/commons/src/main/java/org/apache/ignite/internal/schema/NativeType.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+/**
+ * A thin wrapper over {@link NativeTypeSpec} to instantiate parameterized 
constrained types.
+ */
+public class NativeType implements Comparable<NativeType> {
+    /** */
+    public static final NativeType BYTE = new NativeType(NativeTypeSpec.BYTE, 
1);
+
+    /** */
+    public static final NativeType SHORT = new 
NativeType(NativeTypeSpec.SHORT, 2);
+
+    /** */
+    public static final NativeType INTEGER = new 
NativeType(NativeTypeSpec.INTEGER, 4);
+
+    /** */
+    public static final NativeType LONG = new NativeType(NativeTypeSpec.LONG, 
8);
+
+    /** */
+    public static final NativeType FLOAT = new 
NativeType(NativeTypeSpec.FLOAT, 4);
+
+    /** */
+    public static final NativeType DOUBLE = new 
NativeType(NativeTypeSpec.DOUBLE, 8);
+
+    /** */
+    public static final NativeType UUID = new NativeType(NativeTypeSpec.UUID, 
16);
+
+    /** */
+    public static final NativeType STRING = new 
NativeType(NativeTypeSpec.STRING);
+
+    /** */
+    public static final NativeType BYTES = new 
NativeType(NativeTypeSpec.BYTES);
+
+    /** */
+    private final NativeTypeSpec typeSpec;
+
+    /** Type length. */
+    private int len;
+
+    /**
+     */
+    protected NativeType(NativeTypeSpec typeSpec, int len) {
+        if (!typeSpec.fixedLength())
+            throw new IllegalArgumentException("Size must be provided only for 
fixed-length types: " + typeSpec);
+
+        if (len <= 0)
+            throw new IllegalArgumentException("Size must be positive 
[typeSpec=" + typeSpec + ", size=" + len + ']');
+
+        this.typeSpec = typeSpec;
+        this.len = len;
+    }
+
+    /**
+     */
+    protected NativeType(NativeTypeSpec typeSpec) {
+        if (typeSpec.fixedLength())
+            throw new IllegalArgumentException("Fixed-length types must be 
created by the " +
+                "length-aware constructor: " + typeSpec);
+
+        this.typeSpec = typeSpec;
+    }
+
+    /**
+     * @return Length of the type if it is a fixlen type. For varlen types the 
return value is undefined, so the user
+     * should explicitly check {@code spec().fixedLength()} before using this 
method.
+     *
+     * @see NativeTypeSpec#fixedLength()
+     */
+    public int length() {
+        return len;
+    }
+
+    /**
+     * @return Type specification enum.
+     */
+    public NativeTypeSpec spec() {
+        return typeSpec;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        NativeType that = (NativeType)o;
+
+        return len == that.len && typeSpec == that.typeSpec;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = typeSpec.hashCode();
+
+        res = 31 * res + len;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compareTo(NativeType o) {
+        // Fixed-sized types go first.
+        if (len <= 0 && o.len > 0)
+            return 1;
+
+        if (len > 0 && o.len <= 0)
+            return -1;
+
+        // Either size is -1 for both, or positive for both. Compare sizes, 
then description.
+        int cmp = Integer.compare(len, o.len);
+
+        if (cmp != 0)
+            return cmp;
+
+        return typeSpec.name().compareTo(o.typeSpec.name());
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "NativeType{" +

Review comment:
       think it's better to have one style for string representation across all 
objects.
   
   Following one from NativeTypeSpec, for example:
   `return "NativeType [desc=" + desc + ", size=" + (fixedLength() ? fixedSize 
: "varlen") + ']'` 
   
   BTW they both has the same name

##########
File path: 
modules/commons/src/main/java/org/apache/ignite/internal/schema/NativeTypeSpec.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+/**
+ * Base class for storage built-in data types definition. The class contains 
predefined values
+ * for fixed-sized types and some of the variable-sized types. Parameterized 
types, such as
+ * bitmask of size <code>n</code> bits or number of max n bytes are created 
using static methods.
+ *
+ * An instance of native type provides necessary indirection to read any field 
as an instance of
+ * {@code java.lang.Object} to avoid switching inside the tuple methods.
+ */
+public enum NativeTypeSpec {
+    /**
+     * Native type representing a single-byte signed value.
+     */
+    BYTE("byte", true) {
+        /** {@inheritDoc} */
+        @Override public Object objectValue(Tuple tup, int colIdx) {
+            return tup.byteValueBoxed(colIdx);
+        }
+    },
+
+    /**
+     * Native type representing a two-bytes signed value.
+     */
+    SHORT("short", true) {
+        /** {@inheritDoc} */
+        @Override public Object objectValue(Tuple tup, int colIdx) {
+            return tup.shortValueBoxed(colIdx);
+        }
+    },
+
+    /**
+     * Native type representing a four-bytes signed value.
+     */
+    INTEGER("integer", true) {
+        /** {@inheritDoc} */
+        @Override public Object objectValue(Tuple tup, int colIdx) {
+            return tup.intValueBoxed(colIdx);
+        }
+    },
+
+    /**
+     * Native type representing an eight-bytes signed value.
+     */
+    LONG("long", true) {
+        /** {@inheritDoc} */
+        @Override public Object objectValue(Tuple tup, int colIdx) {
+            return tup.longValueBoxed(colIdx);
+        }
+    },
+
+    /**
+     * Native type representing a four-bytes floating-point value.
+     */
+    FLOAT("float", true) {
+        /** {@inheritDoc} */
+        @Override public Object objectValue(Tuple tup, int colIdx) {
+            return tup.floatValueBoxed(colIdx);
+        }
+    },
+
+    /**
+     * Native type representing an eight-bytes floating-point value.
+     */
+    DOUBLE("double", true) {
+        /** {@inheritDoc} */
+        @Override public Object objectValue(Tuple tup, int colIdx) {
+            return tup.doubleValueBoxed(colIdx);
+        }
+    },
+
+    /**
+     * Native type representing a UUID.
+     */
+    UUID("uuid", true) {
+        /** {@inheritDoc} */
+        @Override public Object objectValue(Tuple tup, int colIdx) {
+            return tup.uuidValue(colIdx);
+        }
+    },
+
+    /**
+     * Native type respresenting a string.
+     */
+    STRING("string") {
+        /** {@inheritDoc} */
+        @Override public Object objectValue(Tuple tup, int colIdx) {
+            return tup.stringValue(colIdx);
+        }
+    },
+
+    /**
+     * Native type representing an arbitrary byte array.
+     */
+    BYTES("blob") {
+        /** {@inheritDoc} */
+        @Override public Object objectValue(Tuple tup, int colIdx) {
+            return tup.bytesValue(colIdx);
+        }
+    },
+
+    /**
+     * Native type representing a bitmask.
+     */
+    BITMASK("bitmask", true) {
+        /** {@inheritDoc} */
+        @Override public Object objectValue(Tuple tup, int colIdx) {
+            return tup.bitmaskValue(colIdx);
+        }
+    };
+
+    /** Flag indicating whether this type specifies a fixed-length type. */
+    private final boolean fixedSize;
+
+    /** Single-token type description. */
+    private final String desc;
+
+    /**
+     * Constructs a varlength type with the given type description.
+     *
+     * @param desc Type description.
+     */
+    NativeTypeSpec(String desc) {
+        this(desc, false);
+    }
+
+    /**
+     * Constructs a type with the given description and size.
+     *
+     * @param desc Type description.
+     * @param fixedSize Flag indicating whether this type specifies a 
fixed-length type.
+     */
+    NativeTypeSpec(String desc, boolean fixedSize) {
+        this.desc = desc;
+        this.fixedSize = fixedSize;
+    }
+
+    /**
+     * @return {@code true} for fixed-length types, {@code false} otherwise.
+     */
+    public boolean fixedLength() {
+        return fixedSize;
+    }
+
+    /**
+     * Indirection method for getting an Object representation of the given 
type from the tuple. This method
+     * does do any type conversions and will throw an exception if tuple 
schema column type differs from this

Review comment:
       So does it do or not?

##########
File path: 
modules/commons/src/main/java/org/apache/ignite/internal/schema/Tuple.java
##########
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import java.util.BitSet;
+import java.util.UUID;
+
+/**
+ * The class contains non-generic methods to read boxed and unboxed primitives 
based on the schema column types.
+ * Any type conversions and coersions should be implemented outside of the 
tuple by the key-value or query runtime.
+ * When a non-boxed primitive is read from a null column value, it is 
converted to the primitive type default value.
+ */
+public abstract class Tuple {
+    /** */
+    public static final int SCHEMA_VERSION_FIELD_SIZE = 2;
+
+    /** */
+    public static final int KEY_HASH_FIELD_SIZE = 4;
+
+    /** */
+    public static final int TOTAL_LEN_FIELD_SIZE = 2;
+
+    /** */
+    public static final int VARSIZE_TABLE_LEN_FIELD_SIZE = 2;
+
+    /** Schema descriptor for which this tuple was created. */
+    private final SchemaDescriptor schema;
+
+    /**
+     * @param schema Schema instance.
+     */
+    protected Tuple(SchemaDescriptor schema) {
+        this.schema = schema;
+    }
+
+    /**
+     */
+    public byte byteValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.BYTE);
+
+        return off < 0 ? 0 : (byte)readByte(offset(off));
+    }
+
+    /**
+     */
+    public Byte byteValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.BYTE);
+
+        return off < 0 ? null : (byte)readByte(offset(off));
+    }
+
+    /**
+     */
+    public short shortValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.SHORT);
+
+        return off < 0 ? 0 : (short)readShort(offset(off));
+    }
+
+    /**
+     */
+    public Short shortValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.SHORT);
+
+        return off < 0 ? null : (short)readShort(offset(off));
+    }
+
+    /**
+     */
+    public int intValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.INTEGER);
+
+        return off < 0 ? 0 : readInteger(offset(off));
+    }
+
+    /**
+     */
+    public Integer intValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.INTEGER);
+
+        return off < 0 ? null : readInteger(offset(off));
+    }
+
+    /**
+     */
+    public long longValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.LONG);
+
+        return off < 0 ? 0 : readLong(offset(off));
+    }
+
+    /**
+     */
+    public Long longValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.LONG);
+
+        return off < 0 ? null : readLong(offset(off));
+    }
+
+    /**
+     */
+    public float floatValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.FLOAT);
+
+        return off < 0 ? 0.f : readFloat(offset(off));
+    }
+
+    /**
+     */
+    public Float floatValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.FLOAT);
+
+        return off < 0 ? null : readFloat(offset(off));
+    }
+
+    /**
+     */
+    public double doubleValue(int col) {
+        long off = findColumn(col, NativeTypeSpec.DOUBLE);
+
+        return off < 0 ? 0.d : readDouble(offset(off));
+    }
+
+    /**
+     */
+    public Double doubleValueBoxed(int col) {
+        long off = findColumn(col, NativeTypeSpec.DOUBLE);
+
+        return off < 0 ? null : readDouble(offset(off));
+    }
+
+    /**
+     */
+    public String stringValue(int col) {
+        long offLen = findColumn(col, NativeTypeSpec.STRING);
+
+        if (offLen < 0)
+            return null;
+
+        int off = offset(offLen);
+        int len = length(offLen);
+
+        return readString(off, len);
+    }
+
+    /**
+     */
+    public byte[] bytesValue(int col) {
+        long offLen = findColumn(col, NativeTypeSpec.BYTES);
+
+        if (offLen < 0)
+            return null;
+
+        int off = offset(offLen);
+        int len = length(offLen);
+
+        return readBytes(off, len);
+    }
+
+    /**
+     */
+    public UUID uuidValue(int col) {
+        long found = findColumn(col, NativeTypeSpec.UUID);
+
+        if (found < 0)
+            return null;
+
+        int off = offset(found);
+
+        long lsb = readLong(off);
+        long msb = readLong(off + 8);
+
+        return new UUID(msb, lsb);
+    }
+
+    /**
+     */
+    public BitSet bitmaskValue(int colIdx) {
+        long offLen = findColumn(colIdx, NativeTypeSpec.BITMASK);
+
+        if (offLen < 0)
+            return null;
+
+        int off = offset(offLen);
+
+        Column col = schema.column(colIdx);
+
+        return BitSet.valueOf(readBytes(off, col.type().length()));
+    }
+
+    /**
+     * Gets the column offset and length encoded into a single 8-byte value (4 
least significant bytes encoding the
+     * offset from the beginning of the tuple and 4 most significant bytes 
encoding the field length for varlength
+     * columns). The offset and length should be extracted using {@link 
#offset(long)} and {@link #length(long)}
+     * methods.
+     * Will also validate that the actual column type matches the requested 
column type, throwing
+     * {@link InvalidTypeException} if the types do not match.
+     *
+     * @param colIdx Column index.
+     * @param type Expected column type.
+     * @return Encoded offset + length of the column.
+     * @see #offset(long)
+     * @see #length(long)
+     * @see InvalidTypeException If actual column type does not match the 
requested column type.
+     */
+    private long findColumn(int colIdx, NativeTypeSpec type) {
+        // Get base offset (key start or value start) for the given column.
+        boolean keyCol = schema.keyColumn(colIdx);
+        Columns cols = schema.columns(colIdx);
+
+        int off = SCHEMA_VERSION_FIELD_SIZE + KEY_HASH_FIELD_SIZE;
+
+        if (!keyCol) {
+            // Jump to the next chunk, the size of the first chunk is written 
at the chunk start.
+            off += readShort(off);
+
+            // Adjust the column index according to the number of key columns.
+            colIdx -= schema.keyColumns().length();
+        }
+
+        Column col = cols.column(colIdx);
+
+        if (col.type().spec() != type)
+            throw new InvalidTypeException("Invalid column type requested 
[requested=" + type +
+                ", column=" + col + ']');
+
+        if (isNull(off, colIdx))
+            return -1;
+
+        return type.fixedLength() ?
+            fixlenColumnOffset(cols, off, colIdx) :
+            varlenColumnOffsetAndLength(cols, off, colIdx);
+    }
+
+    /**
+     * Checks the typle null map for the given column index in the chunk.
+     *
+     * @param baseOff Offset of the chunk start in the tuple.
+     * @param idx Offset of the column in the chunk.
+     * @return {@code true} if the column value is {@code null}.
+     */
+    private boolean isNull(int baseOff, int idx) {
+        int nullMapOff = nullMapOffset(baseOff);
+
+        int nullByte = idx / 8;
+        int posInByte = idx % 8;
+
+        int map = readByte(nullMapOff + nullByte);
+
+        return (map & (1 << posInByte)) != 0;
+    }
+
+    /**
+     * Utility method to extract the column offset from the {@link 
#findColumn(int, NativeTypeSpec)} result. The
+     * offset is calculated from the beginning of the tuple.
+     *
+     * @param offLen {@code findColumn} invocation result.
+     * @return Column offset from the beginning of the tuple.
+     */
+    private static int offset(long offLen) {
+        return (int)offLen;
+    }
+
+    /**
+     * Utility method to extract the column length from the {@link 
#findColumn(int, NativeTypeSpec)} result for
+     * varlength columns.
+     *
+     * @param offLen {@code findColumn} invocation result.
+     * @return Length of the column or {@code 0} if the column is fixed-length.
+     */
+    private static int length(long offLen) {
+        return (int)(offLen >>> 32);
+    }
+
+    /**
+     * Calculates the offset and length of varlen column. First, it calculates 
the number of non-null columns
+     * preceeding the requested column by folding the null map bits. This 
number is used to adjust the column index
+     * and find the corresponding entry in the varlen table. The length of the 
column is calculated either by
+     * subtracting two adjacent varlen table offsets, or by subtracting the 
last varlen table offset from the chunk
+     * length.
+     *
+     * @param cols Columns chunk.
+     * @param baseOff Chunk base offset.
+     * @param idx Column index in the chunk.
+     * @return Encoded offset (from the tuple start) and length of the column 
with the given index.
+     */
+    private long varlenColumnOffsetAndLength(Columns cols, int baseOff, int 
idx) {
+        int nullMapOff = nullMapOffset(baseOff);
+
+        int nullStartByte = cols.firstVarlengthColumn() / 8;
+        int startBitInByte = cols.firstVarlengthColumn() % 8;
+
+        int nullEndByte = idx / 8;
+        int endBitInByte = idx % 8;
+        int numNullsBefore = 0;
+
+        for (int i = nullStartByte; i <= nullEndByte; i++) {
+            int nullmapByte = readByte(nullMapOff + i);
+
+            if (i == nullStartByte)
+                // We need to clear startBitInByte least significant bits
+                nullmapByte &= (0xFF << startBitInByte);
+
+            if (i == nullEndByte)
+                // We need to clear 8-endBitInByte most significant bits
+                nullmapByte &= (0xFF >> (8 - endBitInByte));
+
+            numNullsBefore += Columns.numberOfNullColumns(nullmapByte);
+        }
+
+        idx -= cols.numberOfFixsizeColumns() + numNullsBefore;
+        int vartableSize = readShort(baseOff + TOTAL_LEN_FIELD_SIZE);
+
+        int vartableOff = vartableOffset(baseOff);
+        // Offset of idx-th column is from base offset.
+        int resOff = readShort(vartableOff + 2 * idx);

Review comment:
       to be honest, I don't get this math. Why do we need to multiply by 2? Is 
it described somewhere?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to