This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e6ce1b  [FLINK-12563][table-runtime-blink] Introduce vector data 
format in blink
1e6ce1b is described below

commit 1e6ce1bb0c38be3ccf6baa5f3c7acedb86528080
Author: Jingsong Lee <lzljs3620...@aliyun.com>
AuthorDate: Thu May 23 14:45:24 2019 +0800

    [FLINK-12563][table-runtime-blink] Introduce vector data format in blink
    
    This closes #8492
---
 .../apache/flink/table/dataformat/ColumnarRow.java | 210 +++++++++++++++++++++
 .../dataformat/vector/AbstractColumnVector.java    |  62 ++++++
 .../dataformat/vector/BooleanColumnVector.java     |  26 +++
 .../table/dataformat/vector/ByteColumnVector.java  |  26 +++
 .../table/dataformat/vector/BytesColumnVector.java |  48 +++++
 .../table/dataformat/vector/ColumnVector.java      |  32 ++++
 .../flink/table/dataformat/vector/Dictionary.java  |  34 ++++
 .../dataformat/vector/DoubleColumnVector.java      |  26 +++
 .../table/dataformat/vector/FloatColumnVector.java |  26 +++
 .../table/dataformat/vector/IntColumnVector.java   |  26 +++
 .../table/dataformat/vector/LongColumnVector.java  |  26 +++
 .../table/dataformat/vector/ShortColumnVector.java |  26 +++
 .../dataformat/vector/VectorizedColumnBatch.java   | 134 +++++++++++++
 .../dataformat/vector/heap/AbstractHeapVector.java | 132 +++++++++++++
 .../dataformat/vector/heap/HeapBooleanVector.java  |  51 +++++
 .../dataformat/vector/heap/HeapByteVector.java     |  50 +++++
 .../dataformat/vector/heap/HeapBytesVector.java    | 137 ++++++++++++++
 .../dataformat/vector/heap/HeapDoubleVector.java   |  52 +++++
 .../dataformat/vector/heap/HeapFloatVector.java    |  51 +++++
 .../dataformat/vector/heap/HeapIntVector.java      |  50 +++++
 .../dataformat/vector/heap/HeapLongVector.java     |  50 +++++
 .../dataformat/vector/heap/HeapShortVector.java    |  50 +++++
 .../vector/VectorizedColumnBatchTest.java          | 190 +++++++++++++++++++
 23 files changed, 1515 insertions(+)

diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarRow.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarRow.java
new file mode 100644
index 0000000..8764c98
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarRow.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.dataformat.vector.BytesColumnVector.Bytes;
+import org.apache.flink.table.dataformat.vector.VectorizedColumnBatch;
+
+/**
+ * Columnar row to support access to vector column data. It is a row view in 
{@link VectorizedColumnBatch}.
+ */
+public final class ColumnarRow implements BaseRow {
+       private byte header;
+       private VectorizedColumnBatch vectorizedColumnBatch;
+       private int rowId;
+
+       public ColumnarRow() {}
+
+       public ColumnarRow(VectorizedColumnBatch vectorizedColumnBatch) {
+               this(vectorizedColumnBatch, 0);
+       }
+
+       public ColumnarRow(VectorizedColumnBatch vectorizedColumnBatch, int 
rowId) {
+               this.vectorizedColumnBatch = vectorizedColumnBatch;
+               this.rowId = rowId;
+       }
+
+       public void setVectorizedColumnBatch(
+                       VectorizedColumnBatch vectorizedColumnBatch) {
+               this.vectorizedColumnBatch = vectorizedColumnBatch;
+               this.rowId = 0;
+       }
+
+       public void setRowId(int rowId) {
+               this.rowId = rowId;
+       }
+
+       @Override
+       public byte getHeader() {
+               return header;
+       }
+
+       @Override
+       public void setHeader(byte header) {
+               this.header = header;
+       }
+
+       @Override
+       public int getArity() {
+               return vectorizedColumnBatch.getArity();
+       }
+
+       @Override
+       public boolean isNullAt(int ordinal) {
+               return vectorizedColumnBatch.isNullAt(rowId, ordinal);
+       }
+
+       @Override
+       public boolean getBoolean(int ordinal) {
+               return vectorizedColumnBatch.getBoolean(rowId, ordinal);
+       }
+
+       @Override
+       public byte getByte(int ordinal) {
+               return vectorizedColumnBatch.getByte(rowId, ordinal);
+       }
+
+       @Override
+       public short getShort(int ordinal) {
+               return vectorizedColumnBatch.getShort(rowId, ordinal);
+       }
+
+       @Override
+       public int getInt(int ordinal) {
+               return vectorizedColumnBatch.getInt(rowId, ordinal);
+       }
+
+       @Override
+       public long getLong(int ordinal) {
+               return vectorizedColumnBatch.getLong(rowId, ordinal);
+       }
+
+       @Override
+       public float getFloat(int ordinal) {
+               return vectorizedColumnBatch.getFloat(rowId, ordinal);
+       }
+
+       @Override
+       public double getDouble(int ordinal) {
+               return vectorizedColumnBatch.getDouble(rowId, ordinal);
+       }
+
+       @Override
+       public BinaryString getString(int ordinal) {
+               Bytes byteArray = vectorizedColumnBatch.getByteArray(rowId, 
ordinal);
+               return BinaryString.fromBytes(byteArray.data, byteArray.offset, 
byteArray.len);
+       }
+
+       @Override
+       public Decimal getDecimal(int ordinal, int precision, int scale) {
+               return vectorizedColumnBatch.getDecimal(rowId, ordinal, 
precision, scale);
+       }
+
+       @Override
+       public <T> BinaryGeneric<T> getGeneric(int pos) {
+               throw new UnsupportedOperationException("GenericType is not 
supported.");
+       }
+
+       @Override
+       public byte[] getBinary(int ordinal) {
+               Bytes byteArray = vectorizedColumnBatch.getByteArray(rowId, 
ordinal);
+               if (byteArray.len == byteArray.data.length) {
+                       return byteArray.data;
+               } else {
+                       byte[] ret = new byte[byteArray.len];
+                       System.arraycopy(byteArray.data, byteArray.offset, ret, 
0, byteArray.len);
+                       return ret;
+               }
+       }
+
+       @Override
+       public BaseRow getRow(int ordinal, int numFields) {
+               // TODO
+               throw new UnsupportedOperationException("Row is not 
supported.");
+       }
+
+       @Override
+       public BinaryArray getArray(int ordinal) {
+               // TODO
+               throw new UnsupportedOperationException("Array is not 
supported.");
+       }
+
+       @Override
+       public BinaryMap getMap(int ordinal) {
+               // TODO
+               throw new UnsupportedOperationException("Map is not 
supported.");
+       }
+
+       @Override
+       public void setNullAt(int ordinal) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public void setBoolean(int ordinal, boolean value) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public void setByte(int ordinal, byte value) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public void setShort(int ordinal, short value) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public void setInt(int ordinal, int value) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public void setLong(int ordinal, long value) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public void setFloat(int pos, float value) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public void setDouble(int ordinal, double value) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public void setDecimal(int ordinal, Decimal value, int precision) {
+               throw new UnsupportedOperationException("Not support the 
operation!");
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               throw new UnsupportedOperationException(
+                               "ColumnarRow do not support equals, please 
compare fields one by one!");
+       }
+
+       @Override
+       public int hashCode() {
+               throw new UnsupportedOperationException(
+                               "ColumnarRow do not support hashCode, please 
hash fields one by one!");
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/AbstractColumnVector.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/AbstractColumnVector.java
new file mode 100644
index 0000000..10928af
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/AbstractColumnVector.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+import java.io.Serializable;
+
+/**
+ * Contains the shared structure for {@link ColumnVector}s, including NULL 
information and dictionary.
+ * NOTE: if there are some nulls, must set {@link #noNulls} to false.
+ */
+public abstract class AbstractColumnVector implements ColumnVector, 
Serializable {
+
+       private static final long serialVersionUID = 5340018531388047747L;
+
+       // If the whole column vector has no nulls, this is true, otherwise 
false.
+       protected boolean noNulls = true;
+
+       /**
+        * The Dictionary for this column.
+        * If it's not null, will be used to decode the value in get().
+        */
+       protected Dictionary dictionary;
+
+       /**
+        * Update the dictionary.
+        */
+       public void setDictionary(Dictionary dictionary) {
+               this.dictionary = dictionary;
+       }
+
+       /**
+        * Reserve a integer column for ids of dictionary.
+        * DictionaryIds maybe inconsistent with {@link #setDictionary}. 
Suppose a ColumnVector's data
+        * comes from two pages. Perhaps one page uses a dictionary and the 
other page does not use a
+        * dictionary. The first page that uses a field will have 
dictionaryIds, which requires
+        * decoding the first page (Out batch does not support a mix of 
dictionary).
+        */
+       public abstract IntColumnVector reserveDictionaryIds(int capacity);
+
+       /**
+        * Returns true if this column has a dictionary.
+        */
+       public boolean hasDictionary() {
+               return this.dictionary != null;
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/BooleanColumnVector.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/BooleanColumnVector.java
new file mode 100644
index 0000000..6f2065d
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/BooleanColumnVector.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+/**
+ * Boolean column vector.
+ */
+public interface BooleanColumnVector extends ColumnVector {
+       boolean getBoolean(int i);
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ByteColumnVector.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ByteColumnVector.java
new file mode 100644
index 0000000..fae9715
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ByteColumnVector.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+/**
+ * Byte column vector.
+ */
+public interface ByteColumnVector extends ColumnVector {
+       byte getByte(int i);
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/BytesColumnVector.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/BytesColumnVector.java
new file mode 100644
index 0000000..b09020b
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/BytesColumnVector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+/**
+ * Bytes column vector to get {@link Bytes}, it include original data and 
offset and length.
+ * The data in {@link Bytes} maybe reuse.
+ */
+public interface BytesColumnVector extends ColumnVector {
+       Bytes getBytes(int i);
+
+       /**
+        * Bytes data.
+        */
+       class Bytes{
+               public final byte[] data;
+               public final int offset;
+               public final int len;
+
+               public Bytes(byte[] data, int offset, int len) {
+                       this.data = data;
+                       this.offset = offset;
+                       this.len = len;
+               }
+
+               public byte[] getBytes() {
+                       byte[] res = new byte[len];
+                       System.arraycopy(data, offset, res, 0, len);
+                       return res;
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ColumnVector.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ColumnVector.java
new file mode 100644
index 0000000..ae372a2
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ColumnVector.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+/**
+ * Nullable column vector. Access data through specific subclasses.
+ */
+public interface ColumnVector {
+
+       boolean isNullAt(int i);
+
+       /**
+        * Resets the column to default state.
+        */
+       void reset();
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/Dictionary.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/Dictionary.java
new file mode 100644
index 0000000..0f7d254
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/Dictionary.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+/**
+ * The interface for dictionary in AbstractColumnVector to decode dictionary 
encoded values.
+ */
+public interface Dictionary {
+
+       int decodeToInt(int id);
+
+       long decodeToLong(int id);
+
+       float decodeToFloat(int id);
+
+       double decodeToDouble(int id);
+
+       byte[] decodeToBinary(int id);
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/DoubleColumnVector.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/DoubleColumnVector.java
new file mode 100644
index 0000000..0c1ee13
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/DoubleColumnVector.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+/**
+ * Double column vector.
+ */
+public interface DoubleColumnVector extends ColumnVector {
+       double getDouble(int i);
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/FloatColumnVector.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/FloatColumnVector.java
new file mode 100644
index 0000000..6f4a561
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/FloatColumnVector.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+/**
+ * Float column vector.
+ */
+public interface FloatColumnVector extends ColumnVector {
+       float getFloat(int i);
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/IntColumnVector.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/IntColumnVector.java
new file mode 100644
index 0000000..9325f11
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/IntColumnVector.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+/**
+ * Int column vector.
+ */
+public interface IntColumnVector extends ColumnVector {
+       int getInt(int i);
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/LongColumnVector.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/LongColumnVector.java
new file mode 100644
index 0000000..d27e0aa
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/LongColumnVector.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+/**
+ * Long column vector.
+ */
+public interface LongColumnVector extends ColumnVector {
+       long getLong(int i);
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ShortColumnVector.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ShortColumnVector.java
new file mode 100644
index 0000000..7fa00c4
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ShortColumnVector.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+/**
+ * Short column vector.
+ */
+public interface ShortColumnVector extends ColumnVector {
+       short getShort(int i);
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatch.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatch.java
new file mode 100644
index 0000000..e6dc2fb
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatch.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+import org.apache.flink.table.dataformat.Decimal;
+import org.apache.flink.table.dataformat.vector.BytesColumnVector.Bytes;
+
+import java.io.Serializable;
+
+/**
+ * A VectorizedColumnBatch is a set of rows, organized with each column as a 
vector. It is the
+ * unit of query execution, organized to minimize the cost per row.
+ *
+ * <p>{@code VectorizedColumnBatch}s are influenced by Apache Hive 
VectorizedRowBatch.
+ */
+public class VectorizedColumnBatch implements Serializable {
+       private static final long serialVersionUID = 8180323238728166155L;
+
+       /**
+        * This number is carefully chosen to minimize overhead and typically 
allows
+        * one VectorizedColumnBatch to fit in cache.
+        */
+       public static final int DEFAULT_SIZE = 2048;
+
+       private int numRows;
+       public final ColumnVector[] columns;
+
+       public VectorizedColumnBatch(ColumnVector[] vectors) {
+               this.columns = vectors;
+       }
+
+       /**
+        * Resets the batch for writing.
+        */
+       public void reset() {
+               for (ColumnVector column : columns) {
+                       column.reset();
+               }
+               this.numRows = 0;
+       }
+
+       public void setNumRows(int numRows) {
+               this.numRows = numRows;
+       }
+
+       public int getNumRows() {
+               return numRows;
+       }
+
+       public int getArity() {
+               return columns.length;
+       }
+
+       public boolean isNullAt(int rowId, int colId) {
+               return columns[colId].isNullAt(rowId);
+       }
+
+       public boolean getBoolean(int rowId, int colId) {
+               return ((BooleanColumnVector) columns[colId]).getBoolean(rowId);
+       }
+
+       public byte getByte(int rowId, int colId) {
+               return ((ByteColumnVector) columns[colId]).getByte(rowId);
+       }
+
+       public short getShort(int rowId, int colId) {
+               return ((ShortColumnVector) columns[colId]).getShort(rowId);
+       }
+
+       public int getInt(int rowId, int colId) {
+               return ((IntColumnVector) columns[colId]).getInt(rowId);
+       }
+
+       public long getLong(int rowId, int colId) {
+               return ((LongColumnVector) columns[colId]).getLong(rowId);
+       }
+
+       public float getFloat(int rowId, int colId) {
+               return ((FloatColumnVector) columns[colId]).getFloat(rowId);
+       }
+
+       public double getDouble(int rowId, int colId) {
+               return ((DoubleColumnVector) columns[colId]).getDouble(rowId);
+       }
+
+       public Bytes getByteArray(int rowId, int colId) {
+               return ((BytesColumnVector) columns[colId]).getBytes(rowId);
+       }
+
+       private byte[] getBytes(int rowId, int colId) {
+               Bytes byteArray = getByteArray(rowId, colId);
+               if (byteArray.len == byteArray.data.length) {
+                       return byteArray.data;
+               } else {
+                       return byteArray.getBytes();
+               }
+       }
+
+       public String getString(int rowId, int colId) {
+               Bytes byteArray = getByteArray(rowId, colId);
+               return new String(byteArray.data, byteArray.offset, 
byteArray.len);
+       }
+
+       public Decimal getDecimal(int rowId, int colId, int precision, int 
scale) {
+               if (isNullAt(rowId, colId)) {
+                       return null;
+               }
+
+               if (Decimal.is32BitDecimal(precision)) {
+                       return Decimal.fromUnscaledLong(precision, scale, 
getInt(rowId, colId));
+               } else if (Decimal.is64BitDecimal(precision)) {
+                       return Decimal.fromUnscaledLong(precision, scale, 
getLong(rowId, colId));
+               } else {
+                       byte[] bytes = getBytes(rowId, colId);
+                       return Decimal.fromUnscaledBytes(precision, scale, 
bytes);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/AbstractHeapVector.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/AbstractHeapVector.java
new file mode 100644
index 0000000..2af5be5
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/AbstractHeapVector.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector.heap;
+
+import org.apache.flink.table.dataformat.Decimal;
+import org.apache.flink.table.dataformat.vector.AbstractColumnVector;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.Arrays;
+
+/**
+ * Heap vector that nullable shared structure.
+ */
+public abstract class AbstractHeapVector extends AbstractColumnVector {
+
+       /*
+        * If hasNulls is true, then this array contains true if the value
+        * is null, otherwise false. The array is always allocated, so a batch 
can be re-used
+        * later and nulls added.
+        */
+       protected boolean[] isNull;
+
+       /**
+        * Reusable column for ids of dictionary.
+        */
+       protected HeapIntVector dictionaryIds;
+
+       public AbstractHeapVector(int len) {
+               isNull = new boolean[len];
+       }
+
+       /**
+        * Resets the column to default state.
+        * - fills the isNull array with false.
+        * - sets noNulls to true.
+        */
+       @Override
+       public void reset() {
+               if (!noNulls) {
+                       Arrays.fill(isNull, false);
+               }
+               noNulls = true;
+       }
+
+       public void setNullAt(int i) {
+               isNull[i] = true;
+               noNulls = false;
+       }
+
+       @Override
+       public boolean isNullAt(int i) {
+               return !noNulls && isNull[i];
+       }
+
+       @Override
+       public HeapIntVector reserveDictionaryIds(int capacity) {
+               if (dictionaryIds == null) {
+                       dictionaryIds = new HeapIntVector(capacity);
+               } else {
+                       dictionaryIds.reset();
+               }
+               return dictionaryIds;
+       }
+
+       /**
+        * Returns the underlying integer column for ids of dictionary.
+        */
+       public HeapIntVector getDictionaryIds() {
+               return dictionaryIds;
+       }
+
+       public static AbstractHeapVector[] allocateHeapVectors(LogicalType[] 
fieldTypes, int maxRows) {
+               AbstractHeapVector[] columns = new 
AbstractHeapVector[fieldTypes.length];
+               for (int i = 0; i < fieldTypes.length; i++) {
+                       columns[i] = createHeapColumn(fieldTypes[i], maxRows);
+               }
+               return columns;
+       }
+
+       public static AbstractHeapVector createHeapColumn(LogicalType 
fieldType, int maxRows) {
+               switch (fieldType.getTypeRoot()) {
+                       case BOOLEAN:
+                               return new HeapBooleanVector(maxRows);
+                       case TINYINT:
+                               return new HeapByteVector(maxRows);
+                       case DOUBLE:
+                               return new HeapDoubleVector(maxRows);
+                       case FLOAT:
+                               return new HeapFloatVector(maxRows);
+                       case INTEGER:
+                       case DATE:
+                       case TIME_WITHOUT_TIME_ZONE:
+                               return new HeapIntVector(maxRows);
+                       case BIGINT:
+                       case TIMESTAMP_WITHOUT_TIME_ZONE:
+                               return new HeapLongVector(maxRows);
+                       case DECIMAL:
+                               DecimalType decimalType = (DecimalType) 
fieldType;
+                               if 
(Decimal.is32BitDecimal(decimalType.getPrecision())) {
+                                       return new HeapIntVector(maxRows);
+                               } else if 
(Decimal.is64BitDecimal(decimalType.getPrecision())) {
+                                       return new HeapLongVector(maxRows);
+                               } else {
+                                       return new HeapBytesVector(maxRows);
+                               }
+                       case SMALLINT:
+                               return new HeapShortVector(maxRows);
+                       case VARCHAR:
+                       case VARBINARY:
+                               return new HeapBytesVector(maxRows);
+                       default:
+                               throw new 
UnsupportedOperationException(fieldType  + " is not supported now.");
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapBooleanVector.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapBooleanVector.java
new file mode 100644
index 0000000..52c8f0d1
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapBooleanVector.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector.heap;
+
+import org.apache.flink.table.dataformat.vector.BooleanColumnVector;
+
+/**
+ * This class represents a nullable heap boolean column vector.
+ */
+public class HeapBooleanVector extends AbstractHeapVector implements 
BooleanColumnVector {
+
+       private static final long serialVersionUID = 4131239076731313596L;
+
+       public boolean[] vector;
+
+       public HeapBooleanVector(int len) {
+               super(len);
+               vector = new boolean[len];
+       }
+
+       @Override
+       public HeapIntVector reserveDictionaryIds(int capacity) {
+               throw new RuntimeException("HeapBooleanVector has no 
dictionary.");
+       }
+
+       @Override
+       public HeapIntVector getDictionaryIds() {
+               throw new RuntimeException("HeapBooleanVector has no 
dictionary.");
+       }
+
+       @Override
+       public boolean getBoolean(int i) {
+               return vector[i];
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapByteVector.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapByteVector.java
new file mode 100644
index 0000000..e084554
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapByteVector.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector.heap;
+
+import org.apache.flink.table.dataformat.vector.ByteColumnVector;
+
+/**
+ * This class represents a nullable byte column vector.
+ */
+public class HeapByteVector extends AbstractHeapVector implements 
ByteColumnVector {
+
+       private static final long serialVersionUID = 7216045902943789034L;
+
+       public byte[] vector;
+
+       /**
+        * Don't use this except for testing purposes.
+        *
+        * @param len the number of rows
+        */
+       public HeapByteVector(int len) {
+               super(len);
+               vector = new byte[len];
+       }
+
+       @Override
+       public byte getByte(int i) {
+               if (dictionary == null) {
+                       return vector[i];
+               } else {
+                       return (byte) 
dictionary.decodeToInt(dictionaryIds.vector[i]);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapBytesVector.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapBytesVector.java
new file mode 100644
index 0000000..7d3a992
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapBytesVector.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector.heap;
+
+import org.apache.flink.table.dataformat.vector.BytesColumnVector;
+
+/**
+ * This class supports string and binary data by value reference -- i.e. each 
field is
+ * explicitly present, as opposed to provided by a dictionary reference.
+ * In some cases, all the values will be in the same byte array to begin with,
+ * but this need not be the case. If each value is in a separate byte
+ * array to start with, or not all of the values are in the same original
+ * byte array, you can still assign data by reference into this column vector.
+ * This gives flexibility to use this in multiple situations.
+ *
+ * <p>When setting data by reference, the caller
+ * is responsible for allocating the byte arrays used to hold the data.
+ * You can also set data by value, as long as you call the initBuffer() method 
first.
+ * You can mix "by value" and "by reference" in the same column vector,
+ * though that use is probably not typical.
+ */
+public class HeapBytesVector extends AbstractHeapVector implements 
BytesColumnVector {
+
+       private static final long serialVersionUID = -8529155738773478597L;
+
+       /**
+        * start offset of each field.
+        */
+       public int[] start;
+
+       /**
+        * The length of each field.
+        */
+       public int[] length;
+
+       /**
+        * buffer to use when actually copying in data.
+        */
+       public byte[] buffer;
+
+       /**
+        * Hang onto a byte array for holding smaller byte values.
+        */
+       private int elementsAppended = 0;
+       private int capacity;
+
+       /**
+        * Don't call this constructor except for testing purposes.
+        *
+        * @param size number of elements in the column vector
+        */
+       public HeapBytesVector(int size) {
+               super(size);
+               capacity = size;
+               buffer = new byte[capacity];
+               start = new int[size];
+               length = new int[size];
+       }
+
+       @Override
+       public void reset() {
+               super.reset();
+               elementsAppended = 0;
+       }
+
+       /**
+        * Set a field by actually copying in to a local buffer.
+        * If you must actually copy data in to the array, use this method.
+        * DO NOT USE this method unless it's not practical to set data by 
reference with setRef().
+        * Setting data by reference tends to run a lot faster than copying 
data in.
+        *
+        * @param elementNum index within column vector to set
+        * @param sourceBuf  container of source data
+        * @param start      start byte position within source
+        * @param length     length of source byte sequence
+        */
+       public void setVal(int elementNum, byte[] sourceBuf, int start, int 
length) {
+               reserve(elementsAppended + length);
+               System.arraycopy(sourceBuf, start, buffer, elementsAppended, 
length);
+               this.start[elementNum] = elementsAppended;
+               this.length[elementNum] = length;
+               elementsAppended += length;
+       }
+
+       /**
+        * Set a field by actually copying in to a local buffer.
+        * If you must actually copy data in to the array, use this method.
+        * DO NOT USE this method unless it's not practical to set data by 
reference with setRef().
+        * Setting data by reference tends to run a lot faster than copying 
data in.
+        *
+        * @param elementNum index within column vector to set
+        * @param sourceBuf  container of source data
+        */
+       public void setVal(int elementNum, byte[] sourceBuf) {
+               setVal(elementNum, sourceBuf, 0, sourceBuf.length);
+       }
+
+       private void reserve(int requiredCapacity) {
+               if (requiredCapacity > capacity) {
+                       int newCapacity = requiredCapacity * 2;
+                               try {
+                                       byte[] newData = new byte[newCapacity];
+                                       System.arraycopy(buffer, 0, newData, 0, 
elementsAppended);
+                                       buffer = newData;
+                                       capacity = newCapacity;
+                               } catch (OutOfMemoryError outOfMemoryError) {
+                                       throw new 
UnsupportedOperationException(requiredCapacity + " cannot be satisfied.", 
outOfMemoryError);
+                               }
+               }
+       }
+
+       @Override
+       public Bytes getBytes(int i) {
+               if (dictionary == null) {
+                       return new Bytes(buffer, start[i], length[i]);
+               } else {
+                       byte[] bytes = 
dictionary.decodeToBinary(dictionaryIds.vector[i]);
+                       return new Bytes(bytes, 0, bytes.length);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapDoubleVector.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapDoubleVector.java
new file mode 100644
index 0000000..9e83bf3
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapDoubleVector.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector.heap;
+
+import org.apache.flink.table.dataformat.vector.DoubleColumnVector;
+
+/**
+ * This class represents a nullable double precision floating point column 
vector.
+ * This class will be used for operations on all floating point double types
+ * and as such will use a 64-bit double value to hold the biggest possible 
value.
+ */
+public class HeapDoubleVector extends AbstractHeapVector implements 
DoubleColumnVector {
+
+       private static final long serialVersionUID = 6193940154117411328L;
+
+       public double[] vector;
+
+       /**
+        * Don't use this except for testing purposes.
+        *
+        * @param len the number of rows
+        */
+       public HeapDoubleVector(int len) {
+               super(len);
+               vector = new double[len];
+       }
+
+       @Override
+       public double getDouble(int i) {
+               if (dictionary == null) {
+                       return vector[i];
+               } else {
+                       return 
dictionary.decodeToDouble(dictionaryIds.vector[i]);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapFloatVector.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapFloatVector.java
new file mode 100644
index 0000000..116f59a
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapFloatVector.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector.heap;
+
+import org.apache.flink.table.dataformat.vector.FloatColumnVector;
+
+/**
+ * This class represents a nullable double precision floating point column 
vector.
+ * This class will be used for operations on all floating point float types.
+ */
+public class HeapFloatVector extends AbstractHeapVector implements 
FloatColumnVector {
+
+       private static final long serialVersionUID = 8928878923550041110L;
+
+       public float[] vector;
+
+       /**
+        * Don't use this except for testing purposes.
+        *
+        * @param len the number of rows
+        */
+       public HeapFloatVector(int len) {
+               super(len);
+               vector = new float[len];
+       }
+
+       @Override
+       public float getFloat(int i) {
+               if (dictionary == null) {
+                       return vector[i];
+               } else {
+                       return 
dictionary.decodeToFloat(dictionaryIds.vector[i]);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapIntVector.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapIntVector.java
new file mode 100644
index 0000000..dfb0aeb
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapIntVector.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector.heap;
+
+import org.apache.flink.table.dataformat.vector.IntColumnVector;
+
+/**
+ * This class represents a nullable int column vector.
+ */
+public class HeapIntVector extends AbstractHeapVector implements 
IntColumnVector {
+
+       private static final long serialVersionUID = -2749499358889718254L;
+
+       public int[] vector;
+
+       /**
+        * Don't use this except for testing purposes.
+        *
+        * @param len the number of rows
+        */
+       public HeapIntVector(int len) {
+               super(len);
+               vector = new int[len];
+       }
+
+       @Override
+       public int getInt(int i) {
+               if (dictionary == null) {
+                       return vector[i];
+               } else {
+                       return dictionary.decodeToInt(dictionaryIds.vector[i]);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapLongVector.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapLongVector.java
new file mode 100644
index 0000000..479a592
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapLongVector.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector.heap;
+
+import org.apache.flink.table.dataformat.vector.LongColumnVector;
+
+/**
+ * This class represents a nullable long column vector.
+ */
+public class HeapLongVector extends AbstractHeapVector implements 
LongColumnVector {
+
+       private static final long serialVersionUID = 8534925169458006397L;
+
+       public long[] vector;
+
+       /**
+        * Don't use this except for testing purposes.
+        *
+        * @param len the number of rows
+        */
+       public HeapLongVector(int len) {
+               super(len);
+               vector = new long[len];
+       }
+
+       @Override
+       public long getLong(int i) {
+               if (dictionary == null) {
+                       return vector[i];
+               } else {
+                       return dictionary.decodeToLong(dictionaryIds.vector[i]);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapShortVector.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapShortVector.java
new file mode 100644
index 0000000..c4dbf83
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapShortVector.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector.heap;
+
+import org.apache.flink.table.dataformat.vector.ShortColumnVector;
+
+/**
+ * This class represents a nullable short column vector.
+ */
+public class HeapShortVector extends AbstractHeapVector implements 
ShortColumnVector {
+
+       private static final long serialVersionUID = -8278486456144676292L;
+
+       public short[] vector;
+
+       /**
+        * Don't use this except for testing purposes.
+        *
+        * @param len the number of rows
+        */
+       public HeapShortVector(int len) {
+               super(len);
+               vector = new short[len];
+       }
+
+       @Override
+       public short getShort(int i) {
+               if (dictionary == null) {
+                       return vector[i];
+               } else {
+                       return (short) 
dictionary.decodeToInt(dictionaryIds.vector[i]);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatchTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatchTest.java
new file mode 100644
index 0000000..049b934
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatchTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+import org.apache.flink.table.dataformat.ColumnarRow;
+import org.apache.flink.table.dataformat.vector.heap.HeapBooleanVector;
+import org.apache.flink.table.dataformat.vector.heap.HeapByteVector;
+import org.apache.flink.table.dataformat.vector.heap.HeapBytesVector;
+import org.apache.flink.table.dataformat.vector.heap.HeapDoubleVector;
+import org.apache.flink.table.dataformat.vector.heap.HeapFloatVector;
+import org.apache.flink.table.dataformat.vector.heap.HeapIntVector;
+import org.apache.flink.table.dataformat.vector.heap.HeapLongVector;
+import org.apache.flink.table.dataformat.vector.heap.HeapShortVector;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test {@link VectorizedColumnBatch}.
+ */
+public class VectorizedColumnBatchTest {
+
+       private static final int VECTOR_SIZE = 1024;
+
+       @Test
+       public void testTyped() {
+               HeapBooleanVector col0 = new HeapBooleanVector(VECTOR_SIZE);
+               for (int i = 0; i < VECTOR_SIZE; i++) {
+                       col0.vector[i] = i % 2 == 0;
+               }
+
+               HeapBytesVector col1 = new HeapBytesVector(VECTOR_SIZE);
+               for (int i = 0; i < VECTOR_SIZE; i++) {
+                       byte[] bytes = String.valueOf(i).getBytes();
+                       col1.setVal(i, bytes, 0, bytes.length);
+               }
+
+               HeapByteVector col2 = new HeapByteVector(VECTOR_SIZE);
+               for (int i = 0; i < VECTOR_SIZE; i++) {
+                       col2.vector[i] = (byte) i;
+               }
+
+               HeapDoubleVector col3 = new HeapDoubleVector(VECTOR_SIZE);
+               for (int i = 0; i < VECTOR_SIZE; i++) {
+                       col3.vector[i] = i;
+               }
+
+               HeapFloatVector col4 = new HeapFloatVector(VECTOR_SIZE);
+               for (int i = 0; i < VECTOR_SIZE; i++) {
+                       col4.vector[i] = i;
+               }
+
+               HeapIntVector col5 = new HeapIntVector(VECTOR_SIZE);
+               for (int i = 0; i < VECTOR_SIZE; i++) {
+                       col5.vector[i] = i;
+               }
+
+               HeapLongVector col6 = new HeapLongVector(VECTOR_SIZE);
+               for (int i = 0; i < VECTOR_SIZE; i++) {
+                       col6.vector[i] = i;
+               }
+
+               HeapShortVector col7 = new HeapShortVector(VECTOR_SIZE);
+               for (int i = 0; i < VECTOR_SIZE; i++) {
+                       col7.vector[i] = (short) i;
+               }
+
+               VectorizedColumnBatch batch = new VectorizedColumnBatch(
+                               new ColumnVector[]{col0, col1, col2, col3, 
col4, col5, col6, col7});
+
+               for (int i = 0; i < VECTOR_SIZE; i++) {
+                       ColumnarRow row = new ColumnarRow(batch, i);
+                       assertEquals(row.getBoolean(0), i % 2 == 0);
+                       assertEquals(row.getString(1).toString(), 
String.valueOf(i));
+                       assertEquals(row.getByte(2), (byte) i);
+                       assertEquals(row.getDouble(3), (double) i, 0);
+                       assertEquals(row.getFloat(4), (float) i, 0);
+                       assertEquals(row.getInt(5), i);
+                       assertEquals(row.getLong(6), (long) i);
+                       assertEquals(row.getShort(7), (short) i);
+               }
+       }
+
+       @Test
+       public void testNull() {
+               // all null
+               HeapIntVector col0 = new HeapIntVector(VECTOR_SIZE);
+               for (int i = 0; i < VECTOR_SIZE; i++) {
+                       col0.setNullAt(i);
+               }
+
+               // some null
+               HeapIntVector col1 = new HeapIntVector(VECTOR_SIZE);
+               for (int i = 0; i < VECTOR_SIZE; i++) {
+                       if (i % 2 == 0) {
+                               col1.setNullAt(i);
+                       } else {
+                               col1.vector[i] = i;
+                       }
+               }
+
+               VectorizedColumnBatch batch = new VectorizedColumnBatch(new 
ColumnVector[]{col0, col1});
+
+               for (int i = 0; i < VECTOR_SIZE; i++) {
+                       ColumnarRow row = new ColumnarRow(batch, i);
+                       assertTrue(row.isNullAt(0));
+                       if (i % 2 == 0) {
+                               assertTrue(row.isNullAt(1));
+                       } else {
+                               assertEquals(row.getInt(1), i);
+                       }
+               }
+       }
+
+       @Test
+       public void testDictionary() {
+               // all null
+               HeapIntVector col = new HeapIntVector(VECTOR_SIZE);
+               int[] dict = new int[2];
+               dict[0] = 1998;
+               dict[1] = 9998;
+               col.setDictionary(new TestDictionary(dict));
+               HeapIntVector heapIntVector = 
col.reserveDictionaryIds(VECTOR_SIZE);
+               for (int i = 0; i < VECTOR_SIZE; i++) {
+                       heapIntVector.vector[i] = i % 2 == 0 ? 0 : 1;
+               }
+
+               VectorizedColumnBatch batch = new VectorizedColumnBatch(new 
ColumnVector[]{col});
+
+               for (int i = 0; i < VECTOR_SIZE; i++) {
+                       ColumnarRow row = new ColumnarRow(batch, i);
+                       if (i % 2 == 0) {
+                               assertEquals(row.getInt(0), 1998);
+                       } else {
+                               assertEquals(row.getInt(0), 9998);
+                       }
+               }
+       }
+
+       private final class TestDictionary implements Dictionary {
+               private int[] intDictionary;
+
+               public TestDictionary(int[] dictionary) {
+                       this.intDictionary = dictionary;
+               }
+
+               @Override
+               public int decodeToInt(int id) {
+                       return intDictionary[id];
+               }
+
+               @Override
+               public long decodeToLong(int id) {
+                       throw new UnsupportedOperationException("Dictionary 
encoding does not support float");
+               }
+
+               @Override
+               public float decodeToFloat(int id) {
+                       throw new UnsupportedOperationException("Dictionary 
encoding does not support float");
+               }
+
+               @Override
+               public double decodeToDouble(int id) {
+                       throw new UnsupportedOperationException("Dictionary 
encoding does not support double");
+               }
+
+               @Override
+               public byte[] decodeToBinary(int id) {
+                       throw new UnsupportedOperationException("Dictionary 
encoding does not support String");
+               }
+       }
+}

Reply via email to