akashrn5 commented on a change in pull request #3887:
URL: https://github.com/apache/carbondata/pull/3887#discussion_r475768161



##########
File path: 
core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
##########
@@ -102,6 +109,57 @@ public CarbonColumnVectorImpl(int batchSize, DataType 
dataType) {
 
   }
 
+  @Override
+  public List<CarbonColumnVector> getChildrenVector() {
+    return childrenVector;
+  }
+
+  public void setChildrenVector(ArrayList<CarbonColumnVector> childrenVector) {
+    this.childrenVector = childrenVector;
+  }
+
+  public ArrayList<Integer> getNumberOfChildrenElementsInEachRow() {
+    return childElementsForEachRow;
+  }
+
+  public void setNumberOfChildrenElementsInEachRow(ArrayList<Integer> 
childrenElements) {
+    this.childElementsForEachRow = childrenElements;
+  }
+
+  public void setNumberOfChildrenElementsForArray(byte[] childPageData, int 
pageSize) {

Review comment:
       ```suggestion
     public void setNumberOfChildElementsForArray(byte[] childPageData, int 
pageSize) {
   ```

##########
File path: 
core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
##########
@@ -102,6 +109,57 @@ public CarbonColumnVectorImpl(int batchSize, DataType 
dataType) {
 
   }
 
+  @Override
+  public List<CarbonColumnVector> getChildrenVector() {
+    return childrenVector;
+  }
+
+  public void setChildrenVector(ArrayList<CarbonColumnVector> childrenVector) {
+    this.childrenVector = childrenVector;
+  }
+
+  public ArrayList<Integer> getNumberOfChildrenElementsInEachRow() {
+    return childElementsForEachRow;
+  }
+
+  public void setNumberOfChildrenElementsInEachRow(ArrayList<Integer> 
childrenElements) {
+    this.childElementsForEachRow = childrenElements;
+  }
+
+  public void setNumberOfChildrenElementsForArray(byte[] childPageData, int 
pageSize) {
+    // for complex array type, go through parent page to get the child 
information
+    ByteBuffer childInfoBuffer = ByteBuffer.wrap(childPageData);
+    ArrayList<Integer> childElementsForEachRow = new ArrayList<>();
+    // osset will be an INT size and value will be another INT size, hence 2 * 
INT size

Review comment:
       ```suggestion
       // offset will be an INT size and value will be another INT size, hence 
2 * INT size
   ```

##########
File path: 
core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
##########
@@ -260,4 +265,52 @@ protected String debugInfo() {
     return this.toString();
   }
 
+  // Utility class to update current vector to child vector in case of complex 
type handling
+  public static class VectorUtil {
+    private ColumnVectorInfo vectorInfo;
+    private int pageSize;
+    private CarbonColumnVector vector;
+    private DataType vectorDataType;
+
+    public VectorUtil(ColumnVectorInfo vectorInfo, int pageSize, 
CarbonColumnVector vector,
+        DataType vectorDataType) {
+      this.vectorInfo = vectorInfo;
+      this.pageSize = pageSize;
+      this.vector = vector;
+      this.vectorDataType = vectorDataType;
+    }
+
+    public int getPageSize() {
+      return pageSize;
+    }
+
+    public CarbonColumnVector getVector() {
+      return vector;
+    }
+
+    public DataType getVectorDataType() {
+      return vectorDataType;
+    }
+
+    public VectorUtil checkAndUpdateToChildVector() {
+      Stack<CarbonColumnVector> vectorStack = vectorInfo.getVectorStack();
+      if (vectorStack != null && vectorStack.peek() != null && 
vectorDataType.isComplexType()) {
+        if (vectorDataType.getName().equals("ARRAY")) {
+          ArrayList<Integer> childElementsForEachRow =
+              ((CarbonColumnVectorImpl) vector.getColumnVector())
+                  .getNumberOfChildrenElementsInEachRow();
+          int pageSizeNew = 0;

Review comment:
       rename as `newPageSize`

##########
File path: 
core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
##########
@@ -98,6 +98,14 @@ void prepareDimensionAndMeasureColumnVectors() {
         columnVectorInfo.dimension = queryDimensions[i];
         columnVectorInfo.ordinal = 
queryDimensions[i].getDimension().getOrdinal();
         allColumnInfo[queryDimensions[i].getOrdinal()] = columnVectorInfo;
+      } else if (queryDimensions[i].getDimension().isComplex()) {

Review comment:
       why this change required? as i can see, its just code moved above in 
else if

##########
File path: 
integration/presto/src/main/prestosql/org/apache/carbondata/presto/readers/ComplexTypeStreamReader.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.readers;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import io.prestosql.spi.block.ArrayBlock;
+import io.prestosql.spi.block.RowBlock;
+import io.prestosql.spi.type.*;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import 
org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
+
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.block.BlockBuilder;
+
+import org.apache.carbondata.presto.CarbonVectorBatch;
+import org.apache.carbondata.presto.ColumnarVectorWrapperDirect;
+
+/**
+ * Class to read the complex type Stream [array/struct/map]
+ */
+
+public class ComplexTypeStreamReader extends CarbonColumnVectorImpl
+    implements PrestoVectorBlockBuilder {
+
+  protected int batchSize;
+
+  protected Type type;
+  protected BlockBuilder builder;
+
+  public ComplexTypeStreamReader(int batchSize, StructField field) {
+    super(batchSize, field.getDataType());
+    this.batchSize = batchSize;
+    this.type = getType(field);
+    ArrayList<CarbonColumnVector> childrenList = new ArrayList<>();
+    for (StructField child : field.getChildren()) {
+      childrenList.add(new ColumnarVectorWrapperDirect(
+          CarbonVectorBatch.createDirectStreamReader(this.batchSize, 
child.getDataType(), child)));
+    }
+    setChildrenVector(childrenList);
+    this.builder = type.createBlockBuilder(null, batchSize);
+  }
+
+  Type getType(StructField field) {
+    DataType dataType = field.getDataType();
+    if (dataType == DataTypes.STRING || dataType == DataTypes.VARCHAR) {
+      return VarcharType.VARCHAR;
+    } else if (dataType == DataTypes.SHORT) {
+      return SmallintType.SMALLINT;
+    } else if (dataType == DataTypes.INT) {
+      return IntegerType.INTEGER;
+    } else if (dataType == DataTypes.LONG) {
+      return BigintType.BIGINT;
+    } else if (dataType == DataTypes.DOUBLE) {
+      return DoubleType.DOUBLE;
+    } else if (dataType == DataTypes.FLOAT) {
+      return RealType.REAL;
+    } else if (dataType == DataTypes.BOOLEAN) {
+      return BooleanType.BOOLEAN;
+    } else if (dataType == DataTypes.BINARY) {
+      return VarbinaryType.VARBINARY;
+    } else if (dataType == DataTypes.DATE) {
+      return DateType.DATE;
+    } else if (dataType == DataTypes.TIMESTAMP) {
+      return TimestampType.TIMESTAMP;
+    } else if (dataType == DataTypes.BYTE) {
+      return TinyintType.TINYINT;
+    } else if (DataTypes.isDecimal(dataType)) {
+      org.apache.carbondata.core.metadata.datatype.DecimalType decimal =
+          (org.apache.carbondata.core.metadata.datatype.DecimalType) dataType;
+      return DecimalType.createDecimalType(decimal.getPrecision(), 
decimal.getScale());
+    } else if (DataTypes.isArrayType(dataType)) {
+      return new ArrayType(getType(field.getChildren().get(0)));
+    } else if (DataTypes.isStructType(dataType)) {
+      List<RowType.Field> children = new ArrayList<>();
+      for (StructField child : field.getChildren()) {
+        children.add(new RowType.Field(Optional.of(child.getFieldName()), 
getType(child)));
+      }
+      return RowType.from(children);
+    } else {
+      throw new UnsupportedOperationException("Unsupported type: " + dataType);
+    }
+  }
+
+  @Override public Block buildBlock() {
+    return builder.build();
+  }
+
+  @Override public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
+  }
+
+  @Override public void putObject(int rowId, Object value) {
+    if (value == null) {
+      putNull(rowId);
+    } else {
+      getChildrenVector().get(0).putObject(rowId, value);
+    }
+  }
+
+  public void putComplexObject(List<Integer> offsetVector) {
+    if (type instanceof ArrayType) {
+      Block childBlock = buildChildBlock(getChildrenVector().get(0));
+      // prepare an offset vector with 0 as initial offset
+      int[] offsetVectorArray = new int[offsetVector.size() + 1];
+      for (int i = 1; i <= offsetVector.size(); i++) {
+        offsetVectorArray[i] = offsetVectorArray[i -1] + offsetVector.get(i - 
1);
+      }
+      Block arrayBlock = ArrayBlock
+          .fromElementBlock(offsetVector.size(), Optional.ofNullable(null), 
offsetVectorArray,

Review comment:
       Replace `Optional.ofNullable(null)` with `Optional.empty`

##########
File path: 
core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
##########
@@ -260,4 +265,52 @@ protected String debugInfo() {
     return this.toString();
   }
 
+  // Utility class to update current vector to child vector in case of complex 
type handling
+  public static class VectorUtil {
+    private ColumnVectorInfo vectorInfo;
+    private int pageSize;
+    private CarbonColumnVector vector;
+    private DataType vectorDataType;
+
+    public VectorUtil(ColumnVectorInfo vectorInfo, int pageSize, 
CarbonColumnVector vector,
+        DataType vectorDataType) {
+      this.vectorInfo = vectorInfo;
+      this.pageSize = pageSize;
+      this.vector = vector;
+      this.vectorDataType = vectorDataType;
+    }
+
+    public int getPageSize() {
+      return pageSize;
+    }
+
+    public CarbonColumnVector getVector() {
+      return vector;
+    }
+
+    public DataType getVectorDataType() {
+      return vectorDataType;
+    }
+
+    public VectorUtil checkAndUpdateToChildVector() {
+      Stack<CarbonColumnVector> vectorStack = vectorInfo.getVectorStack();
+      if (vectorStack != null && vectorStack.peek() != null && 
vectorDataType.isComplexType()) {
+        if (vectorDataType.getName().equals("ARRAY")) {

Review comment:
       can you add `ARRAY` and `STRUCT` as complex string constants?

##########
File path: 
core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
##########
@@ -102,6 +109,57 @@ public CarbonColumnVectorImpl(int batchSize, DataType 
dataType) {
 
   }
 
+  @Override
+  public List<CarbonColumnVector> getChildrenVector() {
+    return childrenVector;
+  }
+
+  public void setChildrenVector(ArrayList<CarbonColumnVector> childrenVector) {
+    this.childrenVector = childrenVector;
+  }
+
+  public ArrayList<Integer> getNumberOfChildrenElementsInEachRow() {
+    return childElementsForEachRow;
+  }
+
+  public void setNumberOfChildrenElementsInEachRow(ArrayList<Integer> 
childrenElements) {
+    this.childElementsForEachRow = childrenElements;
+  }
+
+  public void setNumberOfChildrenElementsForArray(byte[] childPageData, int 
pageSize) {
+    // for complex array type, go through parent page to get the child 
information
+    ByteBuffer childInfoBuffer = ByteBuffer.wrap(childPageData);
+    ArrayList<Integer> childElementsForEachRow = new ArrayList<>();
+    // osset will be an INT size and value will be another INT size, hence 2 * 
INT size

Review comment:
       if you just mention, the layout of what and how the info is stored in 
parent data, it will be good for anyone for understanding i feel.

##########
File path: 
core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
##########
@@ -39,6 +40,15 @@
   public int[] invertedIndex;
   public BitSet deletedRows;
   public DecimalConverterFactory.DecimalConverter decimalConverter;
+  public Stack<CarbonColumnVector> vectorStack = null;

Review comment:
       please add comment for this, like you did in the PR description.

##########
File path: 
core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
##########
@@ -102,6 +109,57 @@ public CarbonColumnVectorImpl(int batchSize, DataType 
dataType) {
 
   }
 
+  @Override
+  public List<CarbonColumnVector> getChildrenVector() {
+    return childrenVector;
+  }
+
+  public void setChildrenVector(ArrayList<CarbonColumnVector> childrenVector) {
+    this.childrenVector = childrenVector;
+  }
+
+  public ArrayList<Integer> getNumberOfChildrenElementsInEachRow() {
+    return childElementsForEachRow;
+  }
+
+  public void setNumberOfChildrenElementsInEachRow(ArrayList<Integer> 
childrenElements) {

Review comment:
       ```suggestion
     public void setNumberOfChildElementsInEachRow(ArrayList<Integer> 
childrenElements) {
   ```

##########
File path: 
core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
##########
@@ -102,6 +109,57 @@ public CarbonColumnVectorImpl(int batchSize, DataType 
dataType) {
 
   }
 
+  @Override
+  public List<CarbonColumnVector> getChildrenVector() {
+    return childrenVector;
+  }
+
+  public void setChildrenVector(ArrayList<CarbonColumnVector> childrenVector) {
+    this.childrenVector = childrenVector;
+  }
+
+  public ArrayList<Integer> getNumberOfChildrenElementsInEachRow() {

Review comment:
       ```suggestion
     public ArrayList<Integer> getNumberOfChildElementsInEachRow() {
   ```

##########
File path: 
core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
##########
@@ -102,6 +109,57 @@ public CarbonColumnVectorImpl(int batchSize, DataType 
dataType) {
 
   }
 
+  @Override
+  public List<CarbonColumnVector> getChildrenVector() {
+    return childrenVector;
+  }
+
+  public void setChildrenVector(ArrayList<CarbonColumnVector> childrenVector) {
+    this.childrenVector = childrenVector;
+  }
+
+  public ArrayList<Integer> getNumberOfChildrenElementsInEachRow() {
+    return childElementsForEachRow;
+  }
+
+  public void setNumberOfChildrenElementsInEachRow(ArrayList<Integer> 
childrenElements) {
+    this.childElementsForEachRow = childrenElements;
+  }
+
+  public void setNumberOfChildrenElementsForArray(byte[] childPageData, int 
pageSize) {
+    // for complex array type, go through parent page to get the child 
information
+    ByteBuffer childInfoBuffer = ByteBuffer.wrap(childPageData);
+    ArrayList<Integer> childElementsForEachRow = new ArrayList<>();
+    // osset will be an INT size and value will be another INT size, hence 2 * 
INT size
+    while (childInfoBuffer.remaining() >= 2 * DataTypes.INT.getSizeInBytes()) {
+      int elements = childInfoBuffer.getInt();
+      if (elements == 0) {
+        break;
+      }
+      childElementsForEachRow.add(elements);
+      // skip offset
+      childInfoBuffer.getInt();
+      if (pageSize == childElementsForEachRow.size()) {
+        break;
+      }
+    }
+    setNumberOfChildrenElementsInEachRow(childElementsForEachRow);
+  }
+
+  public void setNumberOfChildrenElementsForStruct(byte[] childPageData, int 
pageSize) {

Review comment:
       ```suggestion
     public void setNumberOfChildElementsForStruct(byte[] childPageData, int 
pageSize) {
   ```

##########
File path: 
core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
##########
@@ -102,6 +109,57 @@ public CarbonColumnVectorImpl(int batchSize, DataType 
dataType) {
 
   }
 
+  @Override
+  public List<CarbonColumnVector> getChildrenVector() {
+    return childrenVector;
+  }
+
+  public void setChildrenVector(ArrayList<CarbonColumnVector> childrenVector) {
+    this.childrenVector = childrenVector;
+  }
+
+  public ArrayList<Integer> getNumberOfChildrenElementsInEachRow() {
+    return childElementsForEachRow;
+  }
+
+  public void setNumberOfChildrenElementsInEachRow(ArrayList<Integer> 
childrenElements) {
+    this.childElementsForEachRow = childrenElements;
+  }
+
+  public void setNumberOfChildrenElementsForArray(byte[] childPageData, int 
pageSize) {

Review comment:
       since we are sending the parent page data here from DirectCompressCodec, 
we can rename `childPageData `as `parentPageData`, as it gives wrong 
assumption. Am i right? Please change in other method too. 

##########
File path: 
core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
##########
@@ -260,4 +265,52 @@ protected String debugInfo() {
     return this.toString();
   }
 
+  // Utility class to update current vector to child vector in case of complex 
type handling
+  public static class VectorUtil {
+    private ColumnVectorInfo vectorInfo;
+    private int pageSize;
+    private CarbonColumnVector vector;
+    private DataType vectorDataType;
+
+    public VectorUtil(ColumnVectorInfo vectorInfo, int pageSize, 
CarbonColumnVector vector,
+        DataType vectorDataType) {
+      this.vectorInfo = vectorInfo;
+      this.pageSize = pageSize;
+      this.vector = vector;
+      this.vectorDataType = vectorDataType;
+    }
+
+    public int getPageSize() {
+      return pageSize;
+    }
+
+    public CarbonColumnVector getVector() {
+      return vector;
+    }
+
+    public DataType getVectorDataType() {
+      return vectorDataType;
+    }
+
+    public VectorUtil checkAndUpdateToChildVector() {
+      Stack<CarbonColumnVector> vectorStack = vectorInfo.getVectorStack();
+      if (vectorStack != null && vectorStack.peek() != null && 
vectorDataType.isComplexType()) {
+        if (vectorDataType.getName().equals("ARRAY")) {
+          ArrayList<Integer> childElementsForEachRow =
+              ((CarbonColumnVectorImpl) vector.getColumnVector())
+                  .getNumberOfChildrenElementsInEachRow();
+          int pageSizeNew = 0;
+          for (int val : childElementsForEachRow) {

Review comment:
       can replace val with `noOfChildElements `or `noOfChild`

##########
File path: 
integration/presto/src/main/prestosql/org/apache/carbondata/presto/readers/ComplexTypeStreamReader.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.readers;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import io.prestosql.spi.block.ArrayBlock;
+import io.prestosql.spi.block.RowBlock;
+import io.prestosql.spi.type.*;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import 
org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
+
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.block.BlockBuilder;
+
+import org.apache.carbondata.presto.CarbonVectorBatch;
+import org.apache.carbondata.presto.ColumnarVectorWrapperDirect;
+
+/**
+ * Class to read the complex type Stream [array/struct/map]
+ */
+
+public class ComplexTypeStreamReader extends CarbonColumnVectorImpl
+    implements PrestoVectorBlockBuilder {
+
+  protected int batchSize;
+
+  protected Type type;
+  protected BlockBuilder builder;
+
+  public ComplexTypeStreamReader(int batchSize, StructField field) {
+    super(batchSize, field.getDataType());
+    this.batchSize = batchSize;
+    this.type = getType(field);
+    ArrayList<CarbonColumnVector> childrenList = new ArrayList<>();
+    for (StructField child : field.getChildren()) {
+      childrenList.add(new ColumnarVectorWrapperDirect(
+          CarbonVectorBatch.createDirectStreamReader(this.batchSize, 
child.getDataType(), child)));
+    }
+    setChildrenVector(childrenList);
+    this.builder = type.createBlockBuilder(null, batchSize);
+  }
+
+  Type getType(StructField field) {
+    DataType dataType = field.getDataType();
+    if (dataType == DataTypes.STRING || dataType == DataTypes.VARCHAR) {
+      return VarcharType.VARCHAR;
+    } else if (dataType == DataTypes.SHORT) {
+      return SmallintType.SMALLINT;
+    } else if (dataType == DataTypes.INT) {
+      return IntegerType.INTEGER;
+    } else if (dataType == DataTypes.LONG) {
+      return BigintType.BIGINT;
+    } else if (dataType == DataTypes.DOUBLE) {
+      return DoubleType.DOUBLE;
+    } else if (dataType == DataTypes.FLOAT) {
+      return RealType.REAL;
+    } else if (dataType == DataTypes.BOOLEAN) {
+      return BooleanType.BOOLEAN;
+    } else if (dataType == DataTypes.BINARY) {
+      return VarbinaryType.VARBINARY;
+    } else if (dataType == DataTypes.DATE) {
+      return DateType.DATE;
+    } else if (dataType == DataTypes.TIMESTAMP) {
+      return TimestampType.TIMESTAMP;
+    } else if (dataType == DataTypes.BYTE) {
+      return TinyintType.TINYINT;
+    } else if (DataTypes.isDecimal(dataType)) {
+      org.apache.carbondata.core.metadata.datatype.DecimalType decimal =
+          (org.apache.carbondata.core.metadata.datatype.DecimalType) dataType;
+      return DecimalType.createDecimalType(decimal.getPrecision(), 
decimal.getScale());
+    } else if (DataTypes.isArrayType(dataType)) {
+      return new ArrayType(getType(field.getChildren().get(0)));
+    } else if (DataTypes.isStructType(dataType)) {
+      List<RowType.Field> children = new ArrayList<>();
+      for (StructField child : field.getChildren()) {
+        children.add(new RowType.Field(Optional.of(child.getFieldName()), 
getType(child)));
+      }
+      return RowType.from(children);
+    } else {
+      throw new UnsupportedOperationException("Unsupported type: " + dataType);
+    }
+  }
+
+  @Override public Block buildBlock() {
+    return builder.build();
+  }
+
+  @Override public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
+  }
+
+  @Override public void putObject(int rowId, Object value) {
+    if (value == null) {
+      putNull(rowId);
+    } else {
+      getChildrenVector().get(0).putObject(rowId, value);
+    }
+  }
+
+  public void putComplexObject(List<Integer> offsetVector) {
+    if (type instanceof ArrayType) {
+      Block childBlock = buildChildBlock(getChildrenVector().get(0));
+      // prepare an offset vector with 0 as initial offset
+      int[] offsetVectorArray = new int[offsetVector.size() + 1];
+      for (int i = 1; i <= offsetVector.size(); i++) {
+        offsetVectorArray[i] = offsetVectorArray[i -1] + offsetVector.get(i - 
1);

Review comment:
       correct the formatting  here

##########
File path: 
integration/presto/src/main/prestosql/org/apache/carbondata/presto/readers/ComplexTypeStreamReader.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.readers;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import io.prestosql.spi.block.ArrayBlock;
+import io.prestosql.spi.block.RowBlock;
+import io.prestosql.spi.type.*;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import 
org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
+
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.block.BlockBuilder;
+
+import org.apache.carbondata.presto.CarbonVectorBatch;
+import org.apache.carbondata.presto.ColumnarVectorWrapperDirect;
+
+/**
+ * Class to read the complex type Stream [array/struct/map]
+ */
+
+public class ComplexTypeStreamReader extends CarbonColumnVectorImpl
+    implements PrestoVectorBlockBuilder {
+
+  protected int batchSize;
+
+  protected Type type;
+  protected BlockBuilder builder;
+
+  public ComplexTypeStreamReader(int batchSize, StructField field) {
+    super(batchSize, field.getDataType());
+    this.batchSize = batchSize;
+    this.type = getType(field);
+    ArrayList<CarbonColumnVector> childrenList = new ArrayList<>();
+    for (StructField child : field.getChildren()) {
+      childrenList.add(new ColumnarVectorWrapperDirect(
+          CarbonVectorBatch.createDirectStreamReader(this.batchSize, 
child.getDataType(), child)));

Review comment:
       `CarbonVectorBatch.createDirectStreamReader` can return null, so to 
avoid warning, you can use intellij suggestion of Objects.requireNonNull

##########
File path: 
core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
##########
@@ -255,6 +255,12 @@ public void decodeAndFillVector(byte[] pageData, 
ColumnVectorInfo vectorInfo, Bi
       CarbonColumnVector vector = vectorInfo.vector;
       BitSet deletedRows = vectorInfo.deletedRows;
       DataType vectorDataType = vector.getType();
+      VectorUtil vectorUtil = new VectorUtil(vectorInfo, pageSize, vector, 
vectorDataType)

Review comment:
       can we extract line 258 to 265 to a method in `AdaptiveCodec` as these 
are all same in all Adaptive codec implementations, it will be clean

##########
File path: 
integration/presto/src/main/prestosql/org/apache/carbondata/presto/readers/ComplexTypeStreamReader.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.readers;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import io.prestosql.spi.block.ArrayBlock;
+import io.prestosql.spi.block.RowBlock;
+import io.prestosql.spi.type.*;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import 
org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
+
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.block.BlockBuilder;
+
+import org.apache.carbondata.presto.CarbonVectorBatch;
+import org.apache.carbondata.presto.ColumnarVectorWrapperDirect;
+
+/**
+ * Class to read the complex type Stream [array/struct/map]
+ */
+
+public class ComplexTypeStreamReader extends CarbonColumnVectorImpl
+    implements PrestoVectorBlockBuilder {
+
+  protected int batchSize;
+
+  protected Type type;
+  protected BlockBuilder builder;
+
+  public ComplexTypeStreamReader(int batchSize, StructField field) {
+    super(batchSize, field.getDataType());
+    this.batchSize = batchSize;
+    this.type = getType(field);
+    ArrayList<CarbonColumnVector> childrenList = new ArrayList<>();
+    for (StructField child : field.getChildren()) {
+      childrenList.add(new ColumnarVectorWrapperDirect(
+          CarbonVectorBatch.createDirectStreamReader(this.batchSize, 
child.getDataType(), child)));
+    }
+    setChildrenVector(childrenList);
+    this.builder = type.createBlockBuilder(null, batchSize);
+  }
+
+  Type getType(StructField field) {
+    DataType dataType = field.getDataType();
+    if (dataType == DataTypes.STRING || dataType == DataTypes.VARCHAR) {
+      return VarcharType.VARCHAR;
+    } else if (dataType == DataTypes.SHORT) {
+      return SmallintType.SMALLINT;
+    } else if (dataType == DataTypes.INT) {
+      return IntegerType.INTEGER;
+    } else if (dataType == DataTypes.LONG) {
+      return BigintType.BIGINT;
+    } else if (dataType == DataTypes.DOUBLE) {
+      return DoubleType.DOUBLE;
+    } else if (dataType == DataTypes.FLOAT) {
+      return RealType.REAL;
+    } else if (dataType == DataTypes.BOOLEAN) {
+      return BooleanType.BOOLEAN;
+    } else if (dataType == DataTypes.BINARY) {
+      return VarbinaryType.VARBINARY;
+    } else if (dataType == DataTypes.DATE) {
+      return DateType.DATE;
+    } else if (dataType == DataTypes.TIMESTAMP) {
+      return TimestampType.TIMESTAMP;
+    } else if (dataType == DataTypes.BYTE) {
+      return TinyintType.TINYINT;
+    } else if (DataTypes.isDecimal(dataType)) {
+      org.apache.carbondata.core.metadata.datatype.DecimalType decimal =
+          (org.apache.carbondata.core.metadata.datatype.DecimalType) dataType;
+      return DecimalType.createDecimalType(decimal.getPrecision(), 
decimal.getScale());
+    } else if (DataTypes.isArrayType(dataType)) {
+      return new ArrayType(getType(field.getChildren().get(0)));
+    } else if (DataTypes.isStructType(dataType)) {
+      List<RowType.Field> children = new ArrayList<>();
+      for (StructField child : field.getChildren()) {
+        children.add(new RowType.Field(Optional.of(child.getFieldName()), 
getType(child)));
+      }
+      return RowType.from(children);
+    } else {
+      throw new UnsupportedOperationException("Unsupported type: " + dataType);
+    }
+  }
+
+  @Override public Block buildBlock() {
+    return builder.build();
+  }
+
+  @Override public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
+  }
+
+  @Override public void putObject(int rowId, Object value) {
+    if (value == null) {
+      putNull(rowId);
+    } else {
+      getChildrenVector().get(0).putObject(rowId, value);

Review comment:
       why always get(0) for put object? shouldn't be based on the datatype of 
struct and array as this is common for complexStreamReaders?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to