This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 1e6ce1b [FLINK-12563][table-runtime-blink] Introduce vector data format in blink 1e6ce1b is described below commit 1e6ce1bb0c38be3ccf6baa5f3c7acedb86528080 Author: Jingsong Lee <lzljs3620...@aliyun.com> AuthorDate: Thu May 23 14:45:24 2019 +0800 [FLINK-12563][table-runtime-blink] Introduce vector data format in blink This closes #8492 --- .../apache/flink/table/dataformat/ColumnarRow.java | 210 +++++++++++++++++++++ .../dataformat/vector/AbstractColumnVector.java | 62 ++++++ .../dataformat/vector/BooleanColumnVector.java | 26 +++ .../table/dataformat/vector/ByteColumnVector.java | 26 +++ .../table/dataformat/vector/BytesColumnVector.java | 48 +++++ .../table/dataformat/vector/ColumnVector.java | 32 ++++ .../flink/table/dataformat/vector/Dictionary.java | 34 ++++ .../dataformat/vector/DoubleColumnVector.java | 26 +++ .../table/dataformat/vector/FloatColumnVector.java | 26 +++ .../table/dataformat/vector/IntColumnVector.java | 26 +++ .../table/dataformat/vector/LongColumnVector.java | 26 +++ .../table/dataformat/vector/ShortColumnVector.java | 26 +++ .../dataformat/vector/VectorizedColumnBatch.java | 134 +++++++++++++ .../dataformat/vector/heap/AbstractHeapVector.java | 132 +++++++++++++ .../dataformat/vector/heap/HeapBooleanVector.java | 51 +++++ .../dataformat/vector/heap/HeapByteVector.java | 50 +++++ .../dataformat/vector/heap/HeapBytesVector.java | 137 ++++++++++++++ .../dataformat/vector/heap/HeapDoubleVector.java | 52 +++++ .../dataformat/vector/heap/HeapFloatVector.java | 51 +++++ .../dataformat/vector/heap/HeapIntVector.java | 50 +++++ .../dataformat/vector/heap/HeapLongVector.java | 50 +++++ .../dataformat/vector/heap/HeapShortVector.java | 50 +++++ .../vector/VectorizedColumnBatchTest.java | 190 +++++++++++++++++++ 23 files changed, 1515 insertions(+) diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarRow.java new file mode 100644 index 0000000..8764c98 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarRow.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +import org.apache.flink.table.dataformat.vector.BytesColumnVector.Bytes; +import org.apache.flink.table.dataformat.vector.VectorizedColumnBatch; + +/** + * Columnar row to support access to vector column data. It is a row view in {@link VectorizedColumnBatch}. + */ +public final class ColumnarRow implements BaseRow { + private byte header; + private VectorizedColumnBatch vectorizedColumnBatch; + private int rowId; + + public ColumnarRow() {} + + public ColumnarRow(VectorizedColumnBatch vectorizedColumnBatch) { + this(vectorizedColumnBatch, 0); + } + + public ColumnarRow(VectorizedColumnBatch vectorizedColumnBatch, int rowId) { + this.vectorizedColumnBatch = vectorizedColumnBatch; + this.rowId = rowId; + } + + public void setVectorizedColumnBatch( + VectorizedColumnBatch vectorizedColumnBatch) { + this.vectorizedColumnBatch = vectorizedColumnBatch; + this.rowId = 0; + } + + public void setRowId(int rowId) { + this.rowId = rowId; + } + + @Override + public byte getHeader() { + return header; + } + + @Override + public void setHeader(byte header) { + this.header = header; + } + + @Override + public int getArity() { + return vectorizedColumnBatch.getArity(); + } + + @Override + public boolean isNullAt(int ordinal) { + return vectorizedColumnBatch.isNullAt(rowId, ordinal); + } + + @Override + public boolean getBoolean(int ordinal) { + return vectorizedColumnBatch.getBoolean(rowId, ordinal); + } + + @Override + public byte getByte(int ordinal) { + return vectorizedColumnBatch.getByte(rowId, ordinal); + } + + @Override + public short getShort(int ordinal) { + return vectorizedColumnBatch.getShort(rowId, ordinal); + } + + @Override + public int getInt(int ordinal) { + return vectorizedColumnBatch.getInt(rowId, ordinal); + } + + @Override + public long getLong(int ordinal) { + return vectorizedColumnBatch.getLong(rowId, ordinal); + } + + @Override + public float getFloat(int ordinal) { + return vectorizedColumnBatch.getFloat(rowId, ordinal); + } + + @Override + public double getDouble(int ordinal) { + return vectorizedColumnBatch.getDouble(rowId, ordinal); + } + + @Override + public BinaryString getString(int ordinal) { + Bytes byteArray = vectorizedColumnBatch.getByteArray(rowId, ordinal); + return BinaryString.fromBytes(byteArray.data, byteArray.offset, byteArray.len); + } + + @Override + public Decimal getDecimal(int ordinal, int precision, int scale) { + return vectorizedColumnBatch.getDecimal(rowId, ordinal, precision, scale); + } + + @Override + public <T> BinaryGeneric<T> getGeneric(int pos) { + throw new UnsupportedOperationException("GenericType is not supported."); + } + + @Override + public byte[] getBinary(int ordinal) { + Bytes byteArray = vectorizedColumnBatch.getByteArray(rowId, ordinal); + if (byteArray.len == byteArray.data.length) { + return byteArray.data; + } else { + byte[] ret = new byte[byteArray.len]; + System.arraycopy(byteArray.data, byteArray.offset, ret, 0, byteArray.len); + return ret; + } + } + + @Override + public BaseRow getRow(int ordinal, int numFields) { + // TODO + throw new UnsupportedOperationException("Row is not supported."); + } + + @Override + public BinaryArray getArray(int ordinal) { + // TODO + throw new UnsupportedOperationException("Array is not supported."); + } + + @Override + public BinaryMap getMap(int ordinal) { + // TODO + throw new UnsupportedOperationException("Map is not supported."); + } + + @Override + public void setNullAt(int ordinal) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public void setBoolean(int ordinal, boolean value) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public void setByte(int ordinal, byte value) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public void setShort(int ordinal, short value) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public void setInt(int ordinal, int value) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public void setLong(int ordinal, long value) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public void setFloat(int pos, float value) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public void setDouble(int ordinal, double value) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public void setDecimal(int ordinal, Decimal value, int precision) { + throw new UnsupportedOperationException("Not support the operation!"); + } + + @Override + public boolean equals(Object o) { + throw new UnsupportedOperationException( + "ColumnarRow do not support equals, please compare fields one by one!"); + } + + @Override + public int hashCode() { + throw new UnsupportedOperationException( + "ColumnarRow do not support hashCode, please hash fields one by one!"); + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/AbstractColumnVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/AbstractColumnVector.java new file mode 100644 index 0000000..10928af --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/AbstractColumnVector.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat.vector; + +import java.io.Serializable; + +/** + * Contains the shared structure for {@link ColumnVector}s, including NULL information and dictionary. + * NOTE: if there are some nulls, must set {@link #noNulls} to false. + */ +public abstract class AbstractColumnVector implements ColumnVector, Serializable { + + private static final long serialVersionUID = 5340018531388047747L; + + // If the whole column vector has no nulls, this is true, otherwise false. + protected boolean noNulls = true; + + /** + * The Dictionary for this column. + * If it's not null, will be used to decode the value in get(). + */ + protected Dictionary dictionary; + + /** + * Update the dictionary. + */ + public void setDictionary(Dictionary dictionary) { + this.dictionary = dictionary; + } + + /** + * Reserve a integer column for ids of dictionary. + * DictionaryIds maybe inconsistent with {@link #setDictionary}. Suppose a ColumnVector's data + * comes from two pages. Perhaps one page uses a dictionary and the other page does not use a + * dictionary. The first page that uses a field will have dictionaryIds, which requires + * decoding the first page (Out batch does not support a mix of dictionary). + */ + public abstract IntColumnVector reserveDictionaryIds(int capacity); + + /** + * Returns true if this column has a dictionary. + */ + public boolean hasDictionary() { + return this.dictionary != null; + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/BooleanColumnVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/BooleanColumnVector.java new file mode 100644 index 0000000..6f2065d --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/BooleanColumnVector.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat.vector; + +/** + * Boolean column vector. + */ +public interface BooleanColumnVector extends ColumnVector { + boolean getBoolean(int i); +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ByteColumnVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ByteColumnVector.java new file mode 100644 index 0000000..fae9715 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ByteColumnVector.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat.vector; + +/** + * Byte column vector. + */ +public interface ByteColumnVector extends ColumnVector { + byte getByte(int i); +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/BytesColumnVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/BytesColumnVector.java new file mode 100644 index 0000000..b09020b --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/BytesColumnVector.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat.vector; + +/** + * Bytes column vector to get {@link Bytes}, it include original data and offset and length. + * The data in {@link Bytes} maybe reuse. + */ +public interface BytesColumnVector extends ColumnVector { + Bytes getBytes(int i); + + /** + * Bytes data. + */ + class Bytes{ + public final byte[] data; + public final int offset; + public final int len; + + public Bytes(byte[] data, int offset, int len) { + this.data = data; + this.offset = offset; + this.len = len; + } + + public byte[] getBytes() { + byte[] res = new byte[len]; + System.arraycopy(data, offset, res, 0, len); + return res; + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ColumnVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ColumnVector.java new file mode 100644 index 0000000..ae372a2 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ColumnVector.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat.vector; + +/** + * Nullable column vector. Access data through specific subclasses. + */ +public interface ColumnVector { + + boolean isNullAt(int i); + + /** + * Resets the column to default state. + */ + void reset(); +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/Dictionary.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/Dictionary.java new file mode 100644 index 0000000..0f7d254 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/Dictionary.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat.vector; + +/** + * The interface for dictionary in AbstractColumnVector to decode dictionary encoded values. + */ +public interface Dictionary { + + int decodeToInt(int id); + + long decodeToLong(int id); + + float decodeToFloat(int id); + + double decodeToDouble(int id); + + byte[] decodeToBinary(int id); +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/DoubleColumnVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/DoubleColumnVector.java new file mode 100644 index 0000000..0c1ee13 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/DoubleColumnVector.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat.vector; + +/** + * Double column vector. + */ +public interface DoubleColumnVector extends ColumnVector { + double getDouble(int i); +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/FloatColumnVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/FloatColumnVector.java new file mode 100644 index 0000000..6f4a561 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/FloatColumnVector.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat.vector; + +/** + * Float column vector. + */ +public interface FloatColumnVector extends ColumnVector { + float getFloat(int i); +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/IntColumnVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/IntColumnVector.java new file mode 100644 index 0000000..9325f11 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/IntColumnVector.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat.vector; + +/** + * Int column vector. + */ +public interface IntColumnVector extends ColumnVector { + int getInt(int i); +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/LongColumnVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/LongColumnVector.java new file mode 100644 index 0000000..d27e0aa --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/LongColumnVector.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat.vector; + +/** + * Long column vector. + */ +public interface LongColumnVector extends ColumnVector { + long getLong(int i); +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ShortColumnVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ShortColumnVector.java new file mode 100644 index 0000000..7fa00c4 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ShortColumnVector.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat.vector; + +/** + * Short column vector. + */ +public interface ShortColumnVector extends ColumnVector { + short getShort(int i); +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatch.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatch.java new file mode 100644 index 0000000..e6dc2fb --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatch.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat.vector; + +import org.apache.flink.table.dataformat.Decimal; +import org.apache.flink.table.dataformat.vector.BytesColumnVector.Bytes; + +import java.io.Serializable; + +/** + * A VectorizedColumnBatch is a set of rows, organized with each column as a vector. It is the + * unit of query execution, organized to minimize the cost per row. + * + * <p>{@code VectorizedColumnBatch}s are influenced by Apache Hive VectorizedRowBatch. + */ +public class VectorizedColumnBatch implements Serializable { + private static final long serialVersionUID = 8180323238728166155L; + + /** + * This number is carefully chosen to minimize overhead and typically allows + * one VectorizedColumnBatch to fit in cache. + */ + public static final int DEFAULT_SIZE = 2048; + + private int numRows; + public final ColumnVector[] columns; + + public VectorizedColumnBatch(ColumnVector[] vectors) { + this.columns = vectors; + } + + /** + * Resets the batch for writing. + */ + public void reset() { + for (ColumnVector column : columns) { + column.reset(); + } + this.numRows = 0; + } + + public void setNumRows(int numRows) { + this.numRows = numRows; + } + + public int getNumRows() { + return numRows; + } + + public int getArity() { + return columns.length; + } + + public boolean isNullAt(int rowId, int colId) { + return columns[colId].isNullAt(rowId); + } + + public boolean getBoolean(int rowId, int colId) { + return ((BooleanColumnVector) columns[colId]).getBoolean(rowId); + } + + public byte getByte(int rowId, int colId) { + return ((ByteColumnVector) columns[colId]).getByte(rowId); + } + + public short getShort(int rowId, int colId) { + return ((ShortColumnVector) columns[colId]).getShort(rowId); + } + + public int getInt(int rowId, int colId) { + return ((IntColumnVector) columns[colId]).getInt(rowId); + } + + public long getLong(int rowId, int colId) { + return ((LongColumnVector) columns[colId]).getLong(rowId); + } + + public float getFloat(int rowId, int colId) { + return ((FloatColumnVector) columns[colId]).getFloat(rowId); + } + + public double getDouble(int rowId, int colId) { + return ((DoubleColumnVector) columns[colId]).getDouble(rowId); + } + + public Bytes getByteArray(int rowId, int colId) { + return ((BytesColumnVector) columns[colId]).getBytes(rowId); + } + + private byte[] getBytes(int rowId, int colId) { + Bytes byteArray = getByteArray(rowId, colId); + if (byteArray.len == byteArray.data.length) { + return byteArray.data; + } else { + return byteArray.getBytes(); + } + } + + public String getString(int rowId, int colId) { + Bytes byteArray = getByteArray(rowId, colId); + return new String(byteArray.data, byteArray.offset, byteArray.len); + } + + public Decimal getDecimal(int rowId, int colId, int precision, int scale) { + if (isNullAt(rowId, colId)) { + return null; + } + + if (Decimal.is32BitDecimal(precision)) { + return Decimal.fromUnscaledLong(precision, scale, getInt(rowId, colId)); + } else if (Decimal.is64BitDecimal(precision)) { + return Decimal.fromUnscaledLong(precision, scale, getLong(rowId, colId)); + } else { + byte[] bytes = getBytes(rowId, colId); + return Decimal.fromUnscaledBytes(precision, scale, bytes); + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/AbstractHeapVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/AbstractHeapVector.java new file mode 100644 index 0000000..2af5be5 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/AbstractHeapVector.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat.vector.heap; + +import org.apache.flink.table.dataformat.Decimal; +import org.apache.flink.table.dataformat.vector.AbstractColumnVector; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; + +import java.util.Arrays; + +/** + * Heap vector that nullable shared structure. + */ +public abstract class AbstractHeapVector extends AbstractColumnVector { + + /* + * If hasNulls is true, then this array contains true if the value + * is null, otherwise false. The array is always allocated, so a batch can be re-used + * later and nulls added. + */ + protected boolean[] isNull; + + /** + * Reusable column for ids of dictionary. + */ + protected HeapIntVector dictionaryIds; + + public AbstractHeapVector(int len) { + isNull = new boolean[len]; + } + + /** + * Resets the column to default state. + * - fills the isNull array with false. + * - sets noNulls to true. + */ + @Override + public void reset() { + if (!noNulls) { + Arrays.fill(isNull, false); + } + noNulls = true; + } + + public void setNullAt(int i) { + isNull[i] = true; + noNulls = false; + } + + @Override + public boolean isNullAt(int i) { + return !noNulls && isNull[i]; + } + + @Override + public HeapIntVector reserveDictionaryIds(int capacity) { + if (dictionaryIds == null) { + dictionaryIds = new HeapIntVector(capacity); + } else { + dictionaryIds.reset(); + } + return dictionaryIds; + } + + /** + * Returns the underlying integer column for ids of dictionary. + */ + public HeapIntVector getDictionaryIds() { + return dictionaryIds; + } + + public static AbstractHeapVector[] allocateHeapVectors(LogicalType[] fieldTypes, int maxRows) { + AbstractHeapVector[] columns = new AbstractHeapVector[fieldTypes.length]; + for (int i = 0; i < fieldTypes.length; i++) { + columns[i] = createHeapColumn(fieldTypes[i], maxRows); + } + return columns; + } + + public static AbstractHeapVector createHeapColumn(LogicalType fieldType, int maxRows) { + switch (fieldType.getTypeRoot()) { + case BOOLEAN: + return new HeapBooleanVector(maxRows); + case TINYINT: + return new HeapByteVector(maxRows); + case DOUBLE: + return new HeapDoubleVector(maxRows); + case FLOAT: + return new HeapFloatVector(maxRows); + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + return new HeapIntVector(maxRows); + case BIGINT: + case TIMESTAMP_WITHOUT_TIME_ZONE: + return new HeapLongVector(maxRows); + case DECIMAL: + DecimalType decimalType = (DecimalType) fieldType; + if (Decimal.is32BitDecimal(decimalType.getPrecision())) { + return new HeapIntVector(maxRows); + } else if (Decimal.is64BitDecimal(decimalType.getPrecision())) { + return new HeapLongVector(maxRows); + } else { + return new HeapBytesVector(maxRows); + } + case SMALLINT: + return new HeapShortVector(maxRows); + case VARCHAR: + case VARBINARY: + return new HeapBytesVector(maxRows); + default: + throw new UnsupportedOperationException(fieldType + " is not supported now."); + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapBooleanVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapBooleanVector.java new file mode 100644 index 0000000..52c8f0d1 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapBooleanVector.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat.vector.heap; + +import org.apache.flink.table.dataformat.vector.BooleanColumnVector; + +/** + * This class represents a nullable heap boolean column vector. + */ +public class HeapBooleanVector extends AbstractHeapVector implements BooleanColumnVector { + + private static final long serialVersionUID = 4131239076731313596L; + + public boolean[] vector; + + public HeapBooleanVector(int len) { + super(len); + vector = new boolean[len]; + } + + @Override + public HeapIntVector reserveDictionaryIds(int capacity) { + throw new RuntimeException("HeapBooleanVector has no dictionary."); + } + + @Override + public HeapIntVector getDictionaryIds() { + throw new RuntimeException("HeapBooleanVector has no dictionary."); + } + + @Override + public boolean getBoolean(int i) { + return vector[i]; + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapByteVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapByteVector.java new file mode 100644 index 0000000..e084554 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapByteVector.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat.vector.heap; + +import org.apache.flink.table.dataformat.vector.ByteColumnVector; + +/** + * This class represents a nullable byte column vector. + */ +public class HeapByteVector extends AbstractHeapVector implements ByteColumnVector { + + private static final long serialVersionUID = 7216045902943789034L; + + public byte[] vector; + + /** + * Don't use this except for testing purposes. + * + * @param len the number of rows + */ + public HeapByteVector(int len) { + super(len); + vector = new byte[len]; + } + + @Override + public byte getByte(int i) { + if (dictionary == null) { + return vector[i]; + } else { + return (byte) dictionary.decodeToInt(dictionaryIds.vector[i]); + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapBytesVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapBytesVector.java new file mode 100644 index 0000000..7d3a992 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapBytesVector.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat.vector.heap; + +import org.apache.flink.table.dataformat.vector.BytesColumnVector; + +/** + * This class supports string and binary data by value reference -- i.e. each field is + * explicitly present, as opposed to provided by a dictionary reference. + * In some cases, all the values will be in the same byte array to begin with, + * but this need not be the case. If each value is in a separate byte + * array to start with, or not all of the values are in the same original + * byte array, you can still assign data by reference into this column vector. + * This gives flexibility to use this in multiple situations. + * + * <p>When setting data by reference, the caller + * is responsible for allocating the byte arrays used to hold the data. + * You can also set data by value, as long as you call the initBuffer() method first. + * You can mix "by value" and "by reference" in the same column vector, + * though that use is probably not typical. + */ +public class HeapBytesVector extends AbstractHeapVector implements BytesColumnVector { + + private static final long serialVersionUID = -8529155738773478597L; + + /** + * start offset of each field. + */ + public int[] start; + + /** + * The length of each field. + */ + public int[] length; + + /** + * buffer to use when actually copying in data. + */ + public byte[] buffer; + + /** + * Hang onto a byte array for holding smaller byte values. + */ + private int elementsAppended = 0; + private int capacity; + + /** + * Don't call this constructor except for testing purposes. + * + * @param size number of elements in the column vector + */ + public HeapBytesVector(int size) { + super(size); + capacity = size; + buffer = new byte[capacity]; + start = new int[size]; + length = new int[size]; + } + + @Override + public void reset() { + super.reset(); + elementsAppended = 0; + } + + /** + * Set a field by actually copying in to a local buffer. + * If you must actually copy data in to the array, use this method. + * DO NOT USE this method unless it's not practical to set data by reference with setRef(). + * Setting data by reference tends to run a lot faster than copying data in. + * + * @param elementNum index within column vector to set + * @param sourceBuf container of source data + * @param start start byte position within source + * @param length length of source byte sequence + */ + public void setVal(int elementNum, byte[] sourceBuf, int start, int length) { + reserve(elementsAppended + length); + System.arraycopy(sourceBuf, start, buffer, elementsAppended, length); + this.start[elementNum] = elementsAppended; + this.length[elementNum] = length; + elementsAppended += length; + } + + /** + * Set a field by actually copying in to a local buffer. + * If you must actually copy data in to the array, use this method. + * DO NOT USE this method unless it's not practical to set data by reference with setRef(). + * Setting data by reference tends to run a lot faster than copying data in. + * + * @param elementNum index within column vector to set + * @param sourceBuf container of source data + */ + public void setVal(int elementNum, byte[] sourceBuf) { + setVal(elementNum, sourceBuf, 0, sourceBuf.length); + } + + private void reserve(int requiredCapacity) { + if (requiredCapacity > capacity) { + int newCapacity = requiredCapacity * 2; + try { + byte[] newData = new byte[newCapacity]; + System.arraycopy(buffer, 0, newData, 0, elementsAppended); + buffer = newData; + capacity = newCapacity; + } catch (OutOfMemoryError outOfMemoryError) { + throw new UnsupportedOperationException(requiredCapacity + " cannot be satisfied.", outOfMemoryError); + } + } + } + + @Override + public Bytes getBytes(int i) { + if (dictionary == null) { + return new Bytes(buffer, start[i], length[i]); + } else { + byte[] bytes = dictionary.decodeToBinary(dictionaryIds.vector[i]); + return new Bytes(bytes, 0, bytes.length); + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapDoubleVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapDoubleVector.java new file mode 100644 index 0000000..9e83bf3 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapDoubleVector.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat.vector.heap; + +import org.apache.flink.table.dataformat.vector.DoubleColumnVector; + +/** + * This class represents a nullable double precision floating point column vector. + * This class will be used for operations on all floating point double types + * and as such will use a 64-bit double value to hold the biggest possible value. + */ +public class HeapDoubleVector extends AbstractHeapVector implements DoubleColumnVector { + + private static final long serialVersionUID = 6193940154117411328L; + + public double[] vector; + + /** + * Don't use this except for testing purposes. + * + * @param len the number of rows + */ + public HeapDoubleVector(int len) { + super(len); + vector = new double[len]; + } + + @Override + public double getDouble(int i) { + if (dictionary == null) { + return vector[i]; + } else { + return dictionary.decodeToDouble(dictionaryIds.vector[i]); + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapFloatVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapFloatVector.java new file mode 100644 index 0000000..116f59a --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapFloatVector.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat.vector.heap; + +import org.apache.flink.table.dataformat.vector.FloatColumnVector; + +/** + * This class represents a nullable double precision floating point column vector. + * This class will be used for operations on all floating point float types. + */ +public class HeapFloatVector extends AbstractHeapVector implements FloatColumnVector { + + private static final long serialVersionUID = 8928878923550041110L; + + public float[] vector; + + /** + * Don't use this except for testing purposes. + * + * @param len the number of rows + */ + public HeapFloatVector(int len) { + super(len); + vector = new float[len]; + } + + @Override + public float getFloat(int i) { + if (dictionary == null) { + return vector[i]; + } else { + return dictionary.decodeToFloat(dictionaryIds.vector[i]); + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapIntVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapIntVector.java new file mode 100644 index 0000000..dfb0aeb --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapIntVector.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat.vector.heap; + +import org.apache.flink.table.dataformat.vector.IntColumnVector; + +/** + * This class represents a nullable int column vector. + */ +public class HeapIntVector extends AbstractHeapVector implements IntColumnVector { + + private static final long serialVersionUID = -2749499358889718254L; + + public int[] vector; + + /** + * Don't use this except for testing purposes. + * + * @param len the number of rows + */ + public HeapIntVector(int len) { + super(len); + vector = new int[len]; + } + + @Override + public int getInt(int i) { + if (dictionary == null) { + return vector[i]; + } else { + return dictionary.decodeToInt(dictionaryIds.vector[i]); + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapLongVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapLongVector.java new file mode 100644 index 0000000..479a592 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapLongVector.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat.vector.heap; + +import org.apache.flink.table.dataformat.vector.LongColumnVector; + +/** + * This class represents a nullable long column vector. + */ +public class HeapLongVector extends AbstractHeapVector implements LongColumnVector { + + private static final long serialVersionUID = 8534925169458006397L; + + public long[] vector; + + /** + * Don't use this except for testing purposes. + * + * @param len the number of rows + */ + public HeapLongVector(int len) { + super(len); + vector = new long[len]; + } + + @Override + public long getLong(int i) { + if (dictionary == null) { + return vector[i]; + } else { + return dictionary.decodeToLong(dictionaryIds.vector[i]); + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapShortVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapShortVector.java new file mode 100644 index 0000000..c4dbf83 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapShortVector.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat.vector.heap; + +import org.apache.flink.table.dataformat.vector.ShortColumnVector; + +/** + * This class represents a nullable short column vector. + */ +public class HeapShortVector extends AbstractHeapVector implements ShortColumnVector { + + private static final long serialVersionUID = -8278486456144676292L; + + public short[] vector; + + /** + * Don't use this except for testing purposes. + * + * @param len the number of rows + */ + public HeapShortVector(int len) { + super(len); + vector = new short[len]; + } + + @Override + public short getShort(int i) { + if (dictionary == null) { + return vector[i]; + } else { + return (short) dictionary.decodeToInt(dictionaryIds.vector[i]); + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatchTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatchTest.java new file mode 100644 index 0000000..049b934 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatchTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat.vector; + +import org.apache.flink.table.dataformat.ColumnarRow; +import org.apache.flink.table.dataformat.vector.heap.HeapBooleanVector; +import org.apache.flink.table.dataformat.vector.heap.HeapByteVector; +import org.apache.flink.table.dataformat.vector.heap.HeapBytesVector; +import org.apache.flink.table.dataformat.vector.heap.HeapDoubleVector; +import org.apache.flink.table.dataformat.vector.heap.HeapFloatVector; +import org.apache.flink.table.dataformat.vector.heap.HeapIntVector; +import org.apache.flink.table.dataformat.vector.heap.HeapLongVector; +import org.apache.flink.table.dataformat.vector.heap.HeapShortVector; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test {@link VectorizedColumnBatch}. + */ +public class VectorizedColumnBatchTest { + + private static final int VECTOR_SIZE = 1024; + + @Test + public void testTyped() { + HeapBooleanVector col0 = new HeapBooleanVector(VECTOR_SIZE); + for (int i = 0; i < VECTOR_SIZE; i++) { + col0.vector[i] = i % 2 == 0; + } + + HeapBytesVector col1 = new HeapBytesVector(VECTOR_SIZE); + for (int i = 0; i < VECTOR_SIZE; i++) { + byte[] bytes = String.valueOf(i).getBytes(); + col1.setVal(i, bytes, 0, bytes.length); + } + + HeapByteVector col2 = new HeapByteVector(VECTOR_SIZE); + for (int i = 0; i < VECTOR_SIZE; i++) { + col2.vector[i] = (byte) i; + } + + HeapDoubleVector col3 = new HeapDoubleVector(VECTOR_SIZE); + for (int i = 0; i < VECTOR_SIZE; i++) { + col3.vector[i] = i; + } + + HeapFloatVector col4 = new HeapFloatVector(VECTOR_SIZE); + for (int i = 0; i < VECTOR_SIZE; i++) { + col4.vector[i] = i; + } + + HeapIntVector col5 = new HeapIntVector(VECTOR_SIZE); + for (int i = 0; i < VECTOR_SIZE; i++) { + col5.vector[i] = i; + } + + HeapLongVector col6 = new HeapLongVector(VECTOR_SIZE); + for (int i = 0; i < VECTOR_SIZE; i++) { + col6.vector[i] = i; + } + + HeapShortVector col7 = new HeapShortVector(VECTOR_SIZE); + for (int i = 0; i < VECTOR_SIZE; i++) { + col7.vector[i] = (short) i; + } + + VectorizedColumnBatch batch = new VectorizedColumnBatch( + new ColumnVector[]{col0, col1, col2, col3, col4, col5, col6, col7}); + + for (int i = 0; i < VECTOR_SIZE; i++) { + ColumnarRow row = new ColumnarRow(batch, i); + assertEquals(row.getBoolean(0), i % 2 == 0); + assertEquals(row.getString(1).toString(), String.valueOf(i)); + assertEquals(row.getByte(2), (byte) i); + assertEquals(row.getDouble(3), (double) i, 0); + assertEquals(row.getFloat(4), (float) i, 0); + assertEquals(row.getInt(5), i); + assertEquals(row.getLong(6), (long) i); + assertEquals(row.getShort(7), (short) i); + } + } + + @Test + public void testNull() { + // all null + HeapIntVector col0 = new HeapIntVector(VECTOR_SIZE); + for (int i = 0; i < VECTOR_SIZE; i++) { + col0.setNullAt(i); + } + + // some null + HeapIntVector col1 = new HeapIntVector(VECTOR_SIZE); + for (int i = 0; i < VECTOR_SIZE; i++) { + if (i % 2 == 0) { + col1.setNullAt(i); + } else { + col1.vector[i] = i; + } + } + + VectorizedColumnBatch batch = new VectorizedColumnBatch(new ColumnVector[]{col0, col1}); + + for (int i = 0; i < VECTOR_SIZE; i++) { + ColumnarRow row = new ColumnarRow(batch, i); + assertTrue(row.isNullAt(0)); + if (i % 2 == 0) { + assertTrue(row.isNullAt(1)); + } else { + assertEquals(row.getInt(1), i); + } + } + } + + @Test + public void testDictionary() { + // all null + HeapIntVector col = new HeapIntVector(VECTOR_SIZE); + int[] dict = new int[2]; + dict[0] = 1998; + dict[1] = 9998; + col.setDictionary(new TestDictionary(dict)); + HeapIntVector heapIntVector = col.reserveDictionaryIds(VECTOR_SIZE); + for (int i = 0; i < VECTOR_SIZE; i++) { + heapIntVector.vector[i] = i % 2 == 0 ? 0 : 1; + } + + VectorizedColumnBatch batch = new VectorizedColumnBatch(new ColumnVector[]{col}); + + for (int i = 0; i < VECTOR_SIZE; i++) { + ColumnarRow row = new ColumnarRow(batch, i); + if (i % 2 == 0) { + assertEquals(row.getInt(0), 1998); + } else { + assertEquals(row.getInt(0), 9998); + } + } + } + + private final class TestDictionary implements Dictionary { + private int[] intDictionary; + + public TestDictionary(int[] dictionary) { + this.intDictionary = dictionary; + } + + @Override + public int decodeToInt(int id) { + return intDictionary[id]; + } + + @Override + public long decodeToLong(int id) { + throw new UnsupportedOperationException("Dictionary encoding does not support float"); + } + + @Override + public float decodeToFloat(int id) { + throw new UnsupportedOperationException("Dictionary encoding does not support float"); + } + + @Override + public double decodeToDouble(int id) { + throw new UnsupportedOperationException("Dictionary encoding does not support double"); + } + + @Override + public byte[] decodeToBinary(int id) { + throw new UnsupportedOperationException("Dictionary encoding does not support String"); + } + } +}