Github user ajantha-bhat commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2869#discussion_r229284011
  
    --- Diff: 
hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
 ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.hadoop.util;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
    +import org.apache.carbondata.core.datastore.block.TableBlockInfo;
    +import org.apache.carbondata.core.metadata.datatype.DataType;
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +import org.apache.carbondata.core.metadata.datatype.DecimalType;
    +import org.apache.carbondata.core.metadata.datatype.StructField;
    +import org.apache.carbondata.core.scan.executor.QueryExecutor;
    +import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
    +import 
org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
    +import org.apache.carbondata.core.scan.model.ProjectionDimension;
    +import org.apache.carbondata.core.scan.model.ProjectionMeasure;
    +import org.apache.carbondata.core.scan.model.QueryModel;
    +import 
org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
    +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
    +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
    +import 
org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
    +import org.apache.carbondata.core.util.ByteUtil;
    +import org.apache.carbondata.hadoop.AbstractRecordReader;
    +import org.apache.carbondata.hadoop.CarbonInputSplit;
    +
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.log4j.Logger;
    +
    +/**
    + * A specialized RecordReader that reads into CarbonColumnarBatches 
directly using the
    + * carbondata column APIs and fills the data directly into columns.
    + */
    +public class CarbonVectorizedRecordReader extends 
AbstractRecordReader<Object> {
    +
    +  private static final Logger LOGGER =
    +      
LogServiceFactory.getLogService(CarbonVectorizedRecordReader.class.getName());
    +
    +  private CarbonColumnarBatch carbonColumnarBatch;
    +
    +  private QueryExecutor queryExecutor;
    +
    +  private int batchIdx = 0;
    +
    +  private int numBatched = 0;
    +
    +  private AbstractDetailQueryResultIterator iterator;
    +
    +  private QueryModel queryModel;
    +
    +  public CarbonVectorizedRecordReader(QueryModel queryModel) {
    +    this.queryModel = queryModel;
    +  }
    +
    +  @Override public void initialize(InputSplit inputSplit, 
TaskAttemptContext taskAttemptContext)
    +      throws IOException, InterruptedException {
    +    List<CarbonInputSplit> splitList;
    +    if (inputSplit instanceof CarbonInputSplit) {
    +      splitList = new ArrayList<>(1);
    +      splitList.add((CarbonInputSplit) inputSplit);
    +    } else {
    +      throw new RuntimeException("unsupported input split type: " + 
inputSplit);
    +    }
    +    List<TableBlockInfo> tableBlockInfoList = 
CarbonInputSplit.createBlocks(splitList);
    +    queryModel.setTableBlockInfos(tableBlockInfoList);
    +    queryModel.setVectorReader(true);
    +    try {
    +      queryExecutor =
    +          QueryExecutorFactory.getQueryExecutor(queryModel, 
taskAttemptContext.getConfiguration());
    +      iterator = (AbstractDetailQueryResultIterator) 
queryExecutor.execute(queryModel);
    +    } catch (QueryExecutionException e) {
    +      LOGGER.error(e);
    +      throw new InterruptedException(e.getMessage());
    +    } catch (Exception e) {
    +      LOGGER.error(e);
    +      throw e;
    +    }
    +  }
    +
    +  @Override public boolean nextKeyValue() throws IOException, 
InterruptedException {
    +    initBatch();
    +    if (batchIdx >= numBatched) {
    +      if (!nextBatch()) return false;
    +    }
    +    ++batchIdx;
    +    return true;
    +  }
    +
    +
    +  private boolean nextBatch() {
    +    carbonColumnarBatch.reset();
    +    if (iterator.hasNext()) {
    +      iterator.processNextBatch(carbonColumnarBatch);
    +      numBatched = carbonColumnarBatch.getActualSize();
    +      batchIdx = 0;
    +      return true;
    +    }
    +    return false;
    +  }
    +
    +  private void initBatch() {
    +    if (carbonColumnarBatch == null) {
    +      List<ProjectionDimension> queryDimension = 
queryModel.getProjectionDimensions();
    +      List<ProjectionMeasure> queryMeasures = 
queryModel.getProjectionMeasures();
    +      StructField[] fields = new StructField[queryDimension.size() + 
queryMeasures.size()];
    +      for (ProjectionDimension dim : queryDimension) {
    +        fields[dim.getOrdinal()] =
    +            new StructField(dim.getColumnName(), 
dim.getDimension().getDataType());
    +      }
    +      for (ProjectionMeasure msr : queryMeasures) {
    +        DataType dataType = msr.getMeasure().getDataType();
    +        if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT
    +            || dataType == DataTypes.INT || dataType == DataTypes.LONG
    +            || dataType == DataTypes.FLOAT || dataType == DataTypes.BYTE) {
    +          fields[msr.getOrdinal()] =
    +              new StructField(msr.getColumnName(), 
msr.getMeasure().getDataType());
    +        } else if (DataTypes.isDecimal(dataType)) {
    +          fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
    +              new DecimalType(msr.getMeasure().getPrecision(), 
msr.getMeasure().getScale()));
    +        } else {
    +          fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), 
DataTypes.DOUBLE);
    +        }
    +      }
    +      CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
    +      for (int i = 0; i < fields.length; i++) {
    +        vectors[i] = new CarbonColumnVectorImpl(
    +            
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT,
    +            fields[i].getDataType());
    +      }
    +      carbonColumnarBatch = new CarbonColumnarBatch(vectors,
    +          
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT,
    +          new boolean[] {});
    +    }
    +  }
    +
    +  @Override
    +  public Object getCurrentValue() throws IOException, InterruptedException 
{
    +    rowCount += 1;
    +    Object[] row = new Object[carbonColumnarBatch.columnVectors.length];
    +    for (int i = 0; i < carbonColumnarBatch.columnVectors.length; i ++) {
    +      if (carbonColumnarBatch.columnVectors[i].getType() == 
DataTypes.STRING
    +          || carbonColumnarBatch.columnVectors[i].getType() == 
DataTypes.VARCHAR) {
    +        byte[] data = (byte[]) 
carbonColumnarBatch.columnVectors[i].getData(batchIdx - 1);
    +        row[i] = ByteUtil.toString(data, 0, data.length);
    +      } else {
    +        row[i] = carbonColumnarBatch.columnVectors[i].getData(batchIdx - 
1);
    +      }
    +    }
    +    return row;
    +  }
    +
    +  @Override public Void getCurrentKey() throws IOException, 
InterruptedException {
    +    return null;
    --- End diff --
    
    Don't return null. throw exception if not supported


---

Reply via email to