Github user ajantha-bhat commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2869#discussion_r229254647
  
    --- Diff: 
hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
 ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.hadoop.util;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
    +import org.apache.carbondata.core.datastore.block.TableBlockInfo;
    +import org.apache.carbondata.core.metadata.datatype.DataType;
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +import org.apache.carbondata.core.metadata.datatype.DecimalType;
    +import org.apache.carbondata.core.metadata.datatype.StructField;
    +import org.apache.carbondata.core.scan.executor.QueryExecutor;
    +import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
    +import 
org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
    +import org.apache.carbondata.core.scan.model.ProjectionDimension;
    +import org.apache.carbondata.core.scan.model.ProjectionMeasure;
    +import org.apache.carbondata.core.scan.model.QueryModel;
    +import 
org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
    +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
    +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
    +import 
org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
    +import org.apache.carbondata.core.util.ByteUtil;
    +import org.apache.carbondata.hadoop.AbstractRecordReader;
    +import org.apache.carbondata.hadoop.CarbonInputSplit;
    +
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.log4j.Logger;
    +
    +/**
    + * A specialized RecordReader that reads into CarbonColumnarBatches 
directly using the
    + * carbondata column APIs and fills the data directly into columns.
    + */
    +public class CarbonVectorizedRecordReader extends 
AbstractRecordReader<Object> {
    +
    +  private static final Logger LOGGER =
    +      
LogServiceFactory.getLogService(CarbonVectorizedRecordReader.class.getName());
    +
    +  private CarbonColumnarBatch carbonColumnarBatch;
    +
    +  private QueryExecutor queryExecutor;
    +
    +  private int batchIdx = 0;
    +
    +  private int numBatched = 0;
    +
    +  private AbstractDetailQueryResultIterator iterator;
    +
    +  private QueryModel queryModel;
    +
    +  public CarbonVectorizedRecordReader(QueryModel queryModel) {
    +    this.queryModel = queryModel;
    +  }
    +
    +  @Override public void initialize(InputSplit inputSplit, 
TaskAttemptContext taskAttemptContext)
    +      throws IOException, InterruptedException {
    +    List<CarbonInputSplit> splitList;
    +    if (inputSplit instanceof CarbonInputSplit) {
    +      splitList = new ArrayList<>(1);
    +      splitList.add((CarbonInputSplit) inputSplit);
    +    } else {
    +      throw new RuntimeException("unsupported input split type: " + 
inputSplit);
    +    }
    +    List<TableBlockInfo> tableBlockInfoList = 
CarbonInputSplit.createBlocks(splitList);
    +    queryModel.setTableBlockInfos(tableBlockInfoList);
    +    queryModel.setVectorReader(true);
    +    try {
    +      queryExecutor =
    +          QueryExecutorFactory.getQueryExecutor(queryModel, 
taskAttemptContext.getConfiguration());
    +      iterator = (AbstractDetailQueryResultIterator) 
queryExecutor.execute(queryModel);
    +    } catch (QueryExecutionException e) {
    +      LOGGER.error(e);
    +      throw new InterruptedException(e.getMessage());
    +    } catch (Exception e) {
    +      LOGGER.error(e);
    +      throw e;
    +    }
    +  }
    +
    +  @Override public boolean nextKeyValue() throws IOException, 
InterruptedException {
    +    initBatch();
    --- End diff --
    
    move initBatch() to initialize method


---

Reply via email to