http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java index 5d927df..73da878 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java @@ -35,8 +35,8 @@ import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.scan.executor.QueryExecutor; import org.apache.carbondata.core.scan.executor.QueryExecutorFactory; import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; -import org.apache.carbondata.core.scan.model.QueryDimension; -import org.apache.carbondata.core.scan.model.QueryMeasure; +import org.apache.carbondata.core.scan.model.ProjectionDimension; +import org.apache.carbondata.core.scan.model.ProjectionMeasure; import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator; import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; @@ -100,7 +100,8 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> { /** * Implementation of RecordReader API. */ - @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException, UnsupportedOperationException { // The input split can contain single HDFS block or multiple blocks, so firstly get all the // blocks and then set them in the query model. @@ -145,7 +146,8 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> { } } - @Override public void close() throws IOException { + @Override + public void close() throws IOException { logStatistics(rowCount, queryModel.getStatisticsRecorder()); if (columnarBatch != null) { columnarBatch.close(); @@ -165,10 +167,13 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> { } } - @Override public boolean nextKeyValue() throws IOException, InterruptedException { + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { resultBatch(); - if (returnColumnarBatch) return nextBatch(); + if (returnColumnarBatch) { + return nextBatch(); + } if (batchIdx >= numBatched) { if (!nextBatch()) return false; @@ -177,7 +182,8 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> { return true; } - @Override public Object getCurrentValue() throws IOException, InterruptedException { + @Override + public Object getCurrentValue() throws IOException, InterruptedException { if (returnColumnarBatch) { int value = columnarBatch.numValidRows(); rowCount += value; @@ -190,11 +196,13 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> { return columnarBatch.getRow(batchIdx - 1); } - @Override public Void getCurrentKey() throws IOException, InterruptedException { + @Override + public Void getCurrentKey() throws IOException, InterruptedException { return null; } - @Override public float getProgress() throws IOException, InterruptedException { + @Override + public float getProgress() throws IOException, InterruptedException { // TODO : Implement it based on total number of rows it is going to retrive. return 0; } @@ -206,44 +214,44 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> { */ private void initBatch(MemoryMode memMode) { - List<QueryDimension> queryDimension = queryModel.getQueryDimension(); - List<QueryMeasure> queryMeasures = queryModel.getQueryMeasures(); + List<ProjectionDimension> queryDimension = queryModel.getProjectionDimensions(); + List<ProjectionMeasure> queryMeasures = queryModel.getProjectionMeasures(); StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()]; for (int i = 0; i < queryDimension.size(); i++) { - QueryDimension dim = queryDimension.get(i); + ProjectionDimension dim = queryDimension.get(i); if (dim.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) { DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory .getDirectDictionaryGenerator(dim.getDimension().getDataType()); - fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(), + fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), CarbonScalaUtil.convertCarbonToSparkDataType(generator.getReturnType()), true, null); } else if (!dim.getDimension().hasEncoding(Encoding.DICTIONARY)) { - fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(), + fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), CarbonScalaUtil.convertCarbonToSparkDataType(dim.getDimension().getDataType()), true, null); } else if (dim.getDimension().isComplex()) { - fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(), + fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), CarbonScalaUtil.convertCarbonToSparkDataType(dim.getDimension().getDataType()), true, null); } else { - fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(), + fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), CarbonScalaUtil.convertCarbonToSparkDataType(DataTypes.INT), true, null); } } for (int i = 0; i < queryMeasures.size(); i++) { - QueryMeasure msr = queryMeasures.get(i); + ProjectionMeasure msr = queryMeasures.get(i); DataType dataType = msr.getMeasure().getDataType(); if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT || dataType == DataTypes.INT || dataType == DataTypes.LONG) { - fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(), + fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), CarbonScalaUtil.convertCarbonToSparkDataType(msr.getMeasure().getDataType()), true, null); } else if (DataTypes.isDecimal(dataType)) { - fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(), + fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), new DecimalType(msr.getMeasure().getPrecision(), msr.getMeasure().getScale()), true, null); } else { - fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(), + fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), CarbonScalaUtil.convertCarbonToSparkDataType(DataTypes.DOUBLE), true, null); } } @@ -261,9 +269,8 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> { initBatch(DEFAULT_MEMORY_MODE); } - private ColumnarBatch resultBatch() { + private void resultBatch() { if (columnarBatch == null) initBatch(); - return columnarBatch; } /*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java index f51ced3..6a401d8 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java @@ -34,20 +34,16 @@ import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.datastore.block.TaskBlockInfo; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.scan.executor.QueryExecutor; import org.apache.carbondata.core.scan.executor.QueryExecutorFactory; import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; -import org.apache.carbondata.core.scan.model.QueryDimension; -import org.apache.carbondata.core.scan.model.QueryMeasure; import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.scan.result.BatchResult; +import org.apache.carbondata.core.scan.result.RowBatch; import org.apache.carbondata.core.scan.result.iterator.RawResultIterator; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.util.DataTypeConverter; /** * Executor class for executing the query on the selected segments to be merged. @@ -70,6 +66,9 @@ public class CarbonCompactionExecutor { */ private boolean restructuredBlockExists; + // converter for UTF8String and decimal conversion + private DataTypeConverter dataTypeConverter; + /** * Constructor * @@ -82,13 +81,14 @@ public class CarbonCompactionExecutor { public CarbonCompactionExecutor(Map<String, TaskBlockInfo> segmentMapping, SegmentProperties segmentProperties, CarbonTable carbonTable, Map<String, List<DataFileFooter>> dataFileMetadataSegMapping, - boolean restructuredBlockExists) { + boolean restructuredBlockExists, DataTypeConverter dataTypeConverter) { this.segmentMapping = segmentMapping; this.destinationSegProperties = segmentProperties; this.carbonTable = carbonTable; this.dataFileMetadataSegMapping = dataFileMetadataSegMapping; this.restructuredBlockExists = restructuredBlockExists; - queryExecutorList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + this.queryExecutorList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + this.dataTypeConverter = dataTypeConverter; } /** @@ -100,7 +100,9 @@ public class CarbonCompactionExecutor { List<RawResultIterator> resultList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); List<TableBlockInfo> list = null; - queryModel = prepareQueryModel(list); + queryModel = carbonTable.createQueryModelWithProjectAllColumns(dataTypeConverter); + queryModel.setReadPageByPage(enablePageLevelReaderForCompaction()); + queryModel.setForcedDetailRawQuery(true); // iterate each seg ID for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) { String segmentId = taskMap.getKey(); @@ -156,7 +158,7 @@ public class CarbonCompactionExecutor { * @param blockList * @return */ - private CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList) + private CarbonIterator<RowBatch> executeBlockList(List<TableBlockInfo> blockList) throws QueryExecutionException, IOException { queryModel.setTableBlockInfos(blockList); QueryExecutor queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); @@ -195,48 +197,6 @@ public class CarbonCompactionExecutor { } /** - * Preparing of the query model. - * - * @param blockList - * @return - */ - private QueryModel prepareQueryModel(List<TableBlockInfo> blockList) { - QueryModel model = new QueryModel(); - model.setTableBlockInfos(blockList); - model.setForcedDetailRawQuery(true); - model.setFilterExpressionResolverTree(null); - model.setConverter(DataTypeUtil.getDataTypeConverter()); - model.setReadPageByPage(enablePageLevelReaderForCompaction()); - - List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - - List<CarbonDimension> dimensions = - carbonTable.getDimensionByTableName(carbonTable.getTableName()); - for (CarbonDimension dim : dimensions) { - // check if dimension is deleted - QueryDimension queryDimension = new QueryDimension(dim.getColName()); - queryDimension.setDimension(dim); - dims.add(queryDimension); - } - model.setQueryDimension(dims); - - List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - List<CarbonMeasure> measures = - carbonTable.getMeasureByTableName(carbonTable.getTableName()); - for (CarbonMeasure carbonMeasure : measures) { - // check if measure is deleted - QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName()); - queryMeasure.setMeasure(carbonMeasure); - msrs.add(queryMeasure); - } - model.setQueryMeasures(msrs); - model.setQueryId(System.nanoTime() + ""); - model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier()); - model.setTable(carbonTable); - return model; - } - - /** * Whether to enable page level reader for compaction or not. */ private boolean enablePageLevelReaderForCompaction() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java index 79e9e5a..b6f12a5 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java +++ b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java @@ -23,7 +23,6 @@ import java.util.Map; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.scan.model.CarbonQueryPlan; import org.apache.carbondata.processing.partition.DataPartitioner; import org.apache.carbondata.processing.partition.Partition; @@ -46,9 +45,8 @@ public final class QueryPartitionHelper { /** * Get partitions applicable for query based on filters applied in query */ - public List<Partition> getPartitionsForQuery(CarbonQueryPlan queryPlan) { - String tableUniqueName = - CarbonTable.buildUniqueName(queryPlan.getDatabaseName(), queryPlan.getTableName()); + public List<Partition> getPartitionsForQuery(String databaseName, String tableName) { + String tableUniqueName = CarbonTable.buildUniqueName(databaseName, tableName); DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName); http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java index 36e022b..01db4f6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java @@ -18,7 +18,6 @@ package org.apache.carbondata.processing.partition.spliter; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -26,19 +25,14 @@ import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.cache.dictionary.Dictionary; -import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.datastore.block.TaskBlockInfo; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.scan.executor.QueryExecutor; import org.apache.carbondata.core.scan.executor.QueryExecutorFactory; import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; -import org.apache.carbondata.core.scan.model.QueryDimension; -import org.apache.carbondata.core.scan.model.QueryMeasure; import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.scan.result.BatchResult; +import org.apache.carbondata.core.scan.result.RowBatch; import org.apache.carbondata.core.util.CarbonUtil; public abstract class AbstractCarbonQueryExecutor { @@ -47,8 +41,8 @@ public abstract class AbstractCarbonQueryExecutor { LogServiceFactory.getLogService(AbstractCarbonQueryExecutor.class.getName()); protected CarbonTable carbonTable; protected QueryModel queryModel; - protected QueryExecutor queryExecutor; - protected Map<String, TaskBlockInfo> segmentMapping; + private QueryExecutor queryExecutor; + Map<String, TaskBlockInfo> segmentMapping; /** * get executor and execute the query model. @@ -56,7 +50,7 @@ public abstract class AbstractCarbonQueryExecutor { * @param blockList * @return */ - protected CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList) + CarbonIterator<RowBatch> executeBlockList(List<TableBlockInfo> blockList) throws QueryExecutionException, IOException { queryModel.setTableBlockInfos(blockList); this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); @@ -64,46 +58,6 @@ public abstract class AbstractCarbonQueryExecutor { } /** - * Preparing of the query model. - * - * @param blockList - * @return - */ - protected QueryModel prepareQueryModel(List<TableBlockInfo> blockList) { - QueryModel model = new QueryModel(); - model.setTableBlockInfos(blockList); - model.setForcedDetailRawQuery(true); - model.setFilterExpressionResolverTree(null); - - List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - - List<CarbonDimension> dimensions = - carbonTable.getDimensionByTableName(carbonTable.getTableName()); - for (CarbonDimension dim : dimensions) { - // check if dimension is deleted - QueryDimension queryDimension = new QueryDimension(dim.getColName()); - queryDimension.setDimension(dim); - dims.add(queryDimension); - } - model.setQueryDimension(dims); - - List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - List<CarbonMeasure> measures = - carbonTable.getMeasureByTableName(carbonTable.getTableName()); - for (CarbonMeasure carbonMeasure : measures) { - // check if measure is deleted - QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName()); - queryMeasure.setMeasure(carbonMeasure); - msrs.add(queryMeasure); - } - model.setQueryMeasures(msrs); - model.setQueryId(System.nanoTime() + ""); - model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier()); - model.setTable(carbonTable); - return model; - } - - /** * Below method will be used * for cleanup */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java index 6afec0b..b18207d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java @@ -31,6 +31,7 @@ import org.apache.carbondata.core.datastore.block.TaskBlockInfo; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; import org.apache.carbondata.core.scan.result.iterator.PartitionSpliterRawResultIterator; +import org.apache.carbondata.core.util.DataTypeConverterImpl; /** * Used to read carbon blocks when add/split partition @@ -48,7 +49,8 @@ public class CarbonSplitExecutor extends AbstractCarbonQueryExecutor { public List<PartitionSpliterRawResultIterator> processDataBlocks(String segmentId) throws QueryExecutionException, IOException { List<TableBlockInfo> list = null; - queryModel = prepareQueryModel(list); + queryModel = carbonTable.createQueryModelWithProjectAllColumns(new DataTypeConverterImpl()); + queryModel.setForcedDetailRawQuery(true); List<PartitionSpliterRawResultIterator> resultList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); TaskBlockInfo taskBlockInfo = segmentMapping.get(segmentId); http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java index ec91472..4abdf3c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java @@ -24,7 +24,7 @@ import java.util.List; import java.util.Map; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.scan.model.CarbonQueryPlan; +import org.apache.carbondata.core.scan.model.QueryProjection; import org.apache.carbondata.processing.partition.Partition; import org.apache.carbondata.processing.partition.impl.DefaultLoadBalancer; import org.apache.carbondata.processing.partition.impl.PartitionMultiFileImpl; @@ -46,7 +46,7 @@ public class CarbonQueryUtil { * It creates the one split for each region server. */ public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName, - CarbonQueryPlan queryPlan) { + QueryProjection queryPlan) { //Just create splits depends on locations of region servers List<Partition> allPartitions = null; @@ -55,7 +55,7 @@ public class CarbonQueryUtil { QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName); } else { allPartitions = - QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan); + QueryPartitionHelper.getInstance().getPartitionsForQuery(databaseName, tableName); } TableSplit[] splits = new TableSplit[allPartitions.size()]; for (int i = 0; i < splits.length; i++) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala index 36a5a15..197cb14 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala @@ -150,7 +150,7 @@ class StreamHandoffRDD[K, V]( CarbonTableInputFormat.setTableInfo(hadoopConf, carbonTable.getTableInfo) val attemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId) val format = new CarbonTableInputFormat[Array[Object]]() - val model = format.getQueryModel(inputSplit, attemptContext) + val model = format.createQueryModel(inputSplit, attemptContext) val inputFormat = new CarbonStreamInputFormat val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext) .asInstanceOf[CarbonStreamRecordReader]