http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/visitor/ResolvedFilterInfoVisitorIntf.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/visitor/ResolvedFilterInfoVisitorIntf.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/visitor/ResolvedFilterInfoVisitorIntf.java new file mode 100644 index 0000000..defc0d4 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/visitor/ResolvedFilterInfoVisitorIntf.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.filter.resolver.resolverinfo.visitor; + +import java.io.IOException; + +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.resolver.metadata.FilterResolverMetadata; +import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo; + +public interface ResolvedFilterInfoVisitorIntf { + + /** + * Visitor pattern is been used in this scenario inorder to populate the + * dimColResolvedFilterInfo visitable object with filter member values based + * on the visitor type, currently there 3 types of visitors custom,direct + * and no dictionary, all types of visitor populate the visitable instance + * as per its buisness logic which is different for all the visitors. + * + * @param visitableObj + * @param metadata + * @throws FilterUnsupportedException + */ + void populateFilterResolvedInfo(DimColumnResolvedFilterInfo visitableObj, + FilterResolverMetadata metadata) throws FilterUnsupportedException, IOException; +}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/model/CarbonQueryPlan.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/CarbonQueryPlan.java b/core/src/main/java/org/apache/carbondata/core/scan/model/CarbonQueryPlan.java new file mode 100644 index 0000000..6b520ff --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/model/CarbonQueryPlan.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * + */ +package org.apache.carbondata.core.scan.model; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.scan.expression.Expression; + +/** + * This class contains all the logical information about the query like dimensions,measures, + * sort order, topN etc.. + */ +public class CarbonQueryPlan implements Serializable { + /** + * + */ + private static final long serialVersionUID = -9036044826928017164L; + + /** + * Database name + */ + private String databaseName; + + /** + * Table name + */ + private String tableName; + + /** + * List of dimensions. + * Ex : select employee_name,department_name,sum(salary) from employee, then employee_name + * and department_name are dimensions + * If there is no dimensions asked in query then it would be remained as empty. + */ + private List<QueryDimension> dimensions = + new ArrayList<QueryDimension>(CarbonCommonConstants.CONSTANT_SIZE_TEN); + + /** + * List of measures. + * Ex : select employee_name,department_name,sum(salary) from employee, then sum(salary) + * would be measure. + * If there is no dimensions asked in query then it would be remained as empty. + */ + private List<QueryMeasure> measures = + new ArrayList<QueryMeasure>(CarbonCommonConstants.CONSTANT_SIZE_TEN); + + /** + * expression + */ + private Expression expression; + + /** + * queryId + */ + private String queryId; + + /** + * If it is raw detail query, no need to aggregate in backend. And it returns with dictionary data + * with out decoding. + */ + private boolean rawDetailQuery; + + /** + * Constructor created with database name and table name. + * + * @param databaseName + * @param tableName + */ + public CarbonQueryPlan(String databaseName, String tableName) { + this.tableName = tableName; + this.databaseName = databaseName; + } + + /** + * @return the dimensions + */ + public List<QueryDimension> getDimensions() { + return dimensions; + } + + public void addDimension(QueryDimension dimension) { + this.dimensions.add(dimension); + } + + /** + * @return the measures + */ + public List<QueryMeasure> getMeasures() { + return measures; + } + + public void addMeasure(QueryMeasure measure) { + this.measures.add(measure); + } + + public Expression getFilterExpression() { + return expression; + } + + public void setFilterExpression(Expression expression) { + this.expression = expression; + } + + /** + * @return the databaseName + */ + public String getDatabaseName() { + return databaseName; + } + + /** + * @return the tableName + */ + public String getTableName() { + return tableName; + } + + public String getQueryId() { + return queryId; + } + + public void setQueryId(String queryId) { + this.queryId = queryId; + } + + public boolean isRawDetailQuery() { + return rawDetailQuery; + } + + public void setRawDetailQuery(boolean rawDetailQuery) { + this.rawDetailQuery = rawDetailQuery; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/model/QueryColumn.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryColumn.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryColumn.java new file mode 100644 index 0000000..e1cc825 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryColumn.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.model; + +import java.io.Serializable; + +/** + * query column which will have information about column + */ +public class QueryColumn implements Serializable { + + /** + * serialVersionUID + */ + private static final long serialVersionUID = -4222306600480181084L; + + /** + * name of the column + */ + protected String columnName; + + /** + * query order in which result of the query will be send + */ + private int queryOrder; + + public QueryColumn(String columnName) { + this.columnName = columnName; + } + + /** + * @return the columnName + */ + public String getColumnName() { + return columnName; + } + + /** + * @return the queryOrder + */ + public int getQueryOrder() { + return queryOrder; + } + + /** + * @param queryOrder the queryOrder to set + */ + public void setQueryOrder(int queryOrder) { + this.queryOrder = queryOrder; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/model/QueryDimension.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryDimension.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryDimension.java new file mode 100644 index 0000000..2d84ee1 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryDimension.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.scan.model; + +import java.io.Serializable; + +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; + +/** + * query plan dimension which will holds the information about the query plan dimension + * this is done to avoid heavy object serialization + */ +public class QueryDimension extends QueryColumn implements Serializable { + + /** + * serialVersionUID + */ + private static final long serialVersionUID = -8492704093776645651L; + /** + * actual dimension column + */ + private transient CarbonDimension dimension; + + public QueryDimension(String columnName) { + super(columnName); + } + + /** + * @return the dimension + */ + public CarbonDimension getDimension() { + return dimension; + } + + /** + * @param dimension the dimension to set + */ + public void setDimension(CarbonDimension dimension) { + this.dimension = dimension; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/model/QueryMeasure.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryMeasure.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryMeasure.java new file mode 100644 index 0000000..3ca6010 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryMeasure.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.scan.model; + +import java.io.Serializable; + +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; + +/** + * query plan measure, this class will holds the information + * about measure present in the query, this is done to avoid the serialization + * of the heavy object + */ +public class QueryMeasure extends QueryColumn implements Serializable { + + /** + * serialVersionUID + */ + private static final long serialVersionUID = 1035512411375495414L; + + /** + * actual carbon measure object + */ + private transient CarbonMeasure measure; + + public QueryMeasure(String columnName) { + super(columnName); + } + + /** + * @return the measure + */ + public CarbonMeasure getMeasure() { + return measure; + } + + /** + * @param measure the measure to set + */ + public void setMeasure(CarbonMeasure measure) { + this.measure = measure; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java new file mode 100644 index 0000000..1b295db --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.scan.model; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.AbsoluteTableIdentifier; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.stats.QueryStatisticsRecorder; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.update.data.UpdateVO; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.scan.expression.ColumnExpression; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.expression.UnknownExpression; +import org.apache.carbondata.core.scan.expression.conditional.ConditionalExpression; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; + +/** + * Query model which will have all the detail + * about the query, This will be sent from driver to executor ' + * This will be refereed to executing the query. + */ +public class QueryModel implements Serializable { + + /** + * serialization version + */ + private static final long serialVersionUID = -4674677234007089052L; + /** + * this will hold the information about the dictionary dimension + * which to + */ + public transient Map<String, Dictionary> columnToDictionaryMapping; + /** + * list of dimension selected for in query + */ + private List<QueryDimension> queryDimension; + /** + * list of measure selected in query + */ + private List<QueryMeasure> queryMeasures; + /** + * query id + */ + private String queryId; + /** + * filter tree + */ + private FilterResolverIntf filterExpressionResolverTree; + + /** + * table block information in which query will be executed + */ + private List<TableBlockInfo> tableBlockInfos; + /** + * absolute table identifier + */ + private AbsoluteTableIdentifier absoluteTableIdentifier; + /** + * To handle most of the computation in query engines like spark and hive, carbon should give + * raw detailed records to it. + */ + private boolean forcedDetailRawQuery; + /** + * table on which query will be executed + * TODO need to remove this ad pass only the path + * and carbon metadata will load the table from metadata file + */ + private CarbonTable table; + + private QueryStatisticsRecorder statisticsRecorder; + + private boolean vectorReader; + + /** + * Invalid table blocks, which need to be removed from + * memory, invalid blocks can be segment which are deleted + * or compacted + */ + private List<String> invalidSegmentIds; + private Map<String, UpdateVO> invalidSegmentBlockIdMap = + new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + + public QueryModel() { + tableBlockInfos = new ArrayList<TableBlockInfo>(); + queryDimension = new ArrayList<QueryDimension>(); + queryMeasures = new ArrayList<QueryMeasure>(); + invalidSegmentIds = new ArrayList<>(); + } + + public static QueryModel createModel(AbsoluteTableIdentifier absoluteTableIdentifier, + CarbonQueryPlan queryPlan, CarbonTable carbonTable) { + QueryModel queryModel = new QueryModel(); + String factTableName = carbonTable.getFactTableName(); + queryModel.setAbsoluteTableIdentifier(absoluteTableIdentifier); + + fillQueryModel(queryPlan, carbonTable, queryModel, factTableName); + + queryModel.setForcedDetailRawQuery(queryPlan.isRawDetailQuery()); + queryModel.setQueryId(queryPlan.getQueryId()); + return queryModel; + } + + private static void fillQueryModel(CarbonQueryPlan queryPlan, CarbonTable carbonTable, + QueryModel queryModel, String factTableName) { + queryModel.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier()); + queryModel.setQueryDimension(queryPlan.getDimensions()); + queryModel.setQueryMeasures(queryPlan.getMeasures()); + if (null != queryPlan.getFilterExpression()) { + processFilterExpression(queryPlan.getFilterExpression(), + carbonTable.getDimensionByTableName(factTableName), + carbonTable.getMeasureByTableName(factTableName)); + } + //TODO need to remove this code, and executor will load the table + // from file metadata + queryModel.setTable(carbonTable); + } + + public static void processFilterExpression(Expression filterExpression, + List<CarbonDimension> dimensions, List<CarbonMeasure> measures) { + if (null != filterExpression) { + if (null != filterExpression.getChildren() && filterExpression.getChildren().size() == 0) { + if (filterExpression instanceof ConditionalExpression) { + List<ColumnExpression> listOfCol = + ((ConditionalExpression) filterExpression).getColumnList(); + for (ColumnExpression expression : listOfCol) { + setDimAndMsrColumnNode(dimensions, measures, expression); + } + } + } + for (Expression expression : filterExpression.getChildren()) { + if (expression instanceof ColumnExpression) { + setDimAndMsrColumnNode(dimensions, measures, (ColumnExpression) expression); + } else if (expression instanceof UnknownExpression) { + UnknownExpression exp = ((UnknownExpression) expression); + List<ColumnExpression> listOfColExpression = exp.getAllColumnList(); + for (ColumnExpression col : listOfColExpression) { + setDimAndMsrColumnNode(dimensions, measures, col); + } + } else { + processFilterExpression(expression, dimensions, measures); + } + } + } + + } + + private static CarbonMeasure getCarbonMetadataMeasure(String name, List<CarbonMeasure> measures) { + for (CarbonMeasure measure : measures) { + if (measure.getColName().equalsIgnoreCase(name)) { + return measure; + } + } + return null; + } + + private static void setDimAndMsrColumnNode(List<CarbonDimension> dimensions, + List<CarbonMeasure> measures, ColumnExpression col) { + CarbonDimension dim; + CarbonMeasure msr; + String columnName; + columnName = col.getColumnName(); + dim = CarbonUtil.findDimension(dimensions, columnName); + col.setCarbonColumn(dim); + col.setDimension(dim); + col.setDimension(true); + if (null == dim) { + msr = getCarbonMetadataMeasure(columnName, measures); + col.setCarbonColumn(msr); + col.setDimension(false); + } + } + + /** + * It gets the projection columns + */ + public CarbonColumn[] getProjectionColumns() { + CarbonColumn[] carbonColumns = + new CarbonColumn[getQueryDimension().size() + getQueryMeasures().size()]; + for (QueryDimension dimension : getQueryDimension()) { + carbonColumns[dimension.getQueryOrder()] = dimension.getDimension(); + } + for (QueryMeasure msr : getQueryMeasures()) { + carbonColumns[msr.getQueryOrder()] = msr.getMeasure(); + } + return carbonColumns; + } + + /** + * @return the queryDimension + */ + public List<QueryDimension> getQueryDimension() { + return queryDimension; + } + + /** + * @param queryDimension the queryDimension to set + */ + public void setQueryDimension(List<QueryDimension> queryDimension) { + this.queryDimension = queryDimension; + } + + /** + * @return the queryMeasures + */ + public List<QueryMeasure> getQueryMeasures() { + return queryMeasures; + } + + /** + * @param queryMeasures the queryMeasures to set + */ + public void setQueryMeasures(List<QueryMeasure> queryMeasures) { + this.queryMeasures = queryMeasures; + } + + /** + * @return the queryId + */ + public String getQueryId() { + return queryId; + } + + /** + * @param queryId the queryId to set + */ + public void setQueryId(String queryId) { + this.queryId = queryId; + } + + /** + * @return the tableBlockInfos + */ + public List<TableBlockInfo> getTableBlockInfos() { + return tableBlockInfos; + } + + /** + * @param tableBlockInfos the tableBlockInfos to set + */ + public void setTableBlockInfos(List<TableBlockInfo> tableBlockInfos) { + this.tableBlockInfos = tableBlockInfos; + } + + /** + * @return the filterEvaluatorTree + */ + public FilterResolverIntf getFilterExpressionResolverTree() { + return filterExpressionResolverTree; + } + + public void setFilterExpressionResolverTree(FilterResolverIntf filterExpressionResolverTree) { + this.filterExpressionResolverTree = filterExpressionResolverTree; + } + + /** + * @return the absoluteTableIdentifier + */ + public AbsoluteTableIdentifier getAbsoluteTableIdentifier() { + return absoluteTableIdentifier; + } + + /** + * @param absoluteTableIdentifier the absoluteTableIdentifier to set + */ + public void setAbsoluteTableIdentifier(AbsoluteTableIdentifier absoluteTableIdentifier) { + this.absoluteTableIdentifier = absoluteTableIdentifier; + } + + /** + * @return the table + */ + public CarbonTable getTable() { + return table; + } + + /** + * @param table the table to set + */ + public void setTable(CarbonTable table) { + this.table = table; + } + + public boolean isForcedDetailRawQuery() { + return forcedDetailRawQuery; + } + + public void setForcedDetailRawQuery(boolean forcedDetailRawQuery) { + this.forcedDetailRawQuery = forcedDetailRawQuery; + } + + /** + * @return + */ + public Map<String, Dictionary> getColumnToDictionaryMapping() { + return columnToDictionaryMapping; + } + + /** + * @param columnToDictionaryMapping + */ + public void setColumnToDictionaryMapping(Map<String, Dictionary> columnToDictionaryMapping) { + this.columnToDictionaryMapping = columnToDictionaryMapping; + } + + public QueryStatisticsRecorder getStatisticsRecorder() { + return statisticsRecorder; + } + + public void setStatisticsRecorder(QueryStatisticsRecorder statisticsRecorder) { + this.statisticsRecorder = statisticsRecorder; + } + + public List<String> getInvalidSegmentIds() { + return invalidSegmentIds; + } + + public void setInvalidSegmentIds(List<String> invalidSegmentIds) { + this.invalidSegmentIds = invalidSegmentIds; + } + + public boolean isVectorReader() { + return vectorReader; + } + + public void setVectorReader(boolean vectorReader) { + this.vectorReader = vectorReader; + } + public void setInvalidBlockForSegmentId(List<UpdateVO> invalidSegmentTimestampList) { + for (UpdateVO anUpdateVO : invalidSegmentTimestampList) { + this.invalidSegmentBlockIdMap.put(anUpdateVO.getSegmentId(), anUpdateVO); + } + } + + public Map<String,UpdateVO> getInvalidBlockVOForSegmentId() { + return invalidSegmentBlockIdMap; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java new file mode 100644 index 0000000..dbbb6dc --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.processor; + +import java.io.IOException; +import java.util.List; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.DataRefNode; +import org.apache.carbondata.core.stats.QueryStatisticsModel; +import org.apache.carbondata.core.datastorage.FileHolder; +import org.apache.carbondata.core.scan.collector.ScannedResultCollector; +import org.apache.carbondata.core.scan.collector.impl.DictionaryBasedResultCollector; +import org.apache.carbondata.core.scan.collector.impl.DictionaryBasedVectorResultCollector; +import org.apache.carbondata.core.scan.collector.impl.RawBasedResultCollector; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.result.AbstractScannedResult; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; +import org.apache.carbondata.core.scan.scanner.BlockletScanner; +import org.apache.carbondata.core.scan.scanner.impl.FilterScanner; +import org.apache.carbondata.core.scan.scanner.impl.NonFilterScanner; + +/** + * This abstract class provides a skeletal implementation of the + * Block iterator. + */ +public abstract class AbstractDataBlockIterator extends CarbonIterator<List<Object[]>> { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(AbstractDataBlockIterator.class.getName()); + + /** + * iterator which will be used to iterate over data blocks + */ + protected CarbonIterator<DataRefNode> dataBlockIterator; + + /** + * result collector which will be used to aggregate the scanned result + */ + protected ScannedResultCollector scannerResultAggregator; + + /** + * processor which will be used to process the block processing can be + * filter processing or non filter processing + */ + protected BlockletScanner blockletScanner; + + /** + * to hold the data block + */ + protected BlocksChunkHolder blocksChunkHolder; + + /** + * batch size of result + */ + protected int batchSize; + + protected AbstractScannedResult scannedResult; + + public AbstractDataBlockIterator(BlockExecutionInfo blockExecutionInfo, + FileHolder fileReader, int batchSize, QueryStatisticsModel queryStatisticsModel) { + dataBlockIterator = new BlockletIterator(blockExecutionInfo.getFirstDataBlock(), + blockExecutionInfo.getNumberOfBlockToScan()); + blocksChunkHolder = new BlocksChunkHolder(blockExecutionInfo.getTotalNumberDimensionBlock(), + blockExecutionInfo.getTotalNumberOfMeasureBlock()); + blocksChunkHolder.setFileReader(fileReader); + + if (blockExecutionInfo.getFilterExecuterTree() != null) { + blockletScanner = new FilterScanner(blockExecutionInfo, queryStatisticsModel); + } else { + blockletScanner = new NonFilterScanner(blockExecutionInfo, queryStatisticsModel); + } + if (blockExecutionInfo.isRawRecordDetailQuery()) { + LOGGER.info("Row based raw collector is used to scan and collect the data"); + this.scannerResultAggregator = + new RawBasedResultCollector(blockExecutionInfo); + } else if (blockExecutionInfo.isVectorBatchCollector()) { + LOGGER.info("Vector based dictionary collector is used to scan and collect the data"); + this.scannerResultAggregator = + new DictionaryBasedVectorResultCollector(blockExecutionInfo); + } else { + LOGGER.info("Row based dictionary collector is used to scan and collect the data"); + this.scannerResultAggregator = + new DictionaryBasedResultCollector(blockExecutionInfo); + } + this.batchSize = batchSize; + } + + public boolean hasNext() { + if (scannedResult != null && scannedResult.hasNext()) { + return true; + } else { + return dataBlockIterator.hasNext(); + } + } + + protected boolean updateScanner() { + try { + if (scannedResult != null && scannedResult.hasNext()) { + return true; + } else { + scannedResult = getNextScannedResult(); + while (scannedResult != null) { + if (scannedResult.hasNext()) { + return true; + } + scannedResult = getNextScannedResult(); + } + return false; + } + } catch (IOException | FilterUnsupportedException ex) { + throw new RuntimeException(ex); + } + } + + private AbstractScannedResult getNextScannedResult() + throws IOException, FilterUnsupportedException { + if (dataBlockIterator.hasNext()) { + blocksChunkHolder.setDataBlock(dataBlockIterator.next()); + blocksChunkHolder.reset(); + return blockletScanner.scanBlocklet(blocksChunkHolder); + } + return null; + } + + public abstract void processNextBatch(CarbonColumnarBatch columnarBatch); +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockletIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockletIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockletIterator.java new file mode 100644 index 0000000..f06aa6a --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockletIterator.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.scan.processor; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.datastore.DataRefNode; + +/** + * Below class will be used to iterate over data block + */ +public class BlockletIterator extends CarbonIterator<DataRefNode> { + /** + * data store block + */ + protected DataRefNode datablock; + /** + * block counter to keep a track how many block has been processed + */ + private int blockCounter; + + /** + * flag to be used to check any more data block is present or not + */ + private boolean hasNext = true; + + /** + * total number blocks assgned to this iterator + */ + private long totalNumberOfBlocksToScan; + + /** + * Constructor + * + * @param datablock first data block + * @param totalNumberOfBlocksToScan total number of blocks to be scanned + */ + public BlockletIterator(DataRefNode datablock, long totalNumberOfBlocksToScan) { + this.datablock = datablock; + this.totalNumberOfBlocksToScan = totalNumberOfBlocksToScan; + } + + /** + * is all the blocks assigned to this iterator has been processed + */ + @Override public boolean hasNext() { + return hasNext; + } + + @Override + /** + * To get the next block + * @return next data block + * + */ + public DataRefNode next() { + // get the current blocks + DataRefNode datablockTemp = datablock; + // store the next data block + datablock = datablock.getNextDataRefNode(); + // increment the counter + blockCounter++; + // if all the data block is processed then + // set the has next flag to false + // or if number of blocks assigned to this iterator is processed + // then also set the hasnext flag to false + if (null == datablock || blockCounter >= this.totalNumberOfBlocksToScan) { + hasNext = false; + } + return datablockTemp; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java new file mode 100644 index 0000000..81628b3 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.processor; + +import org.apache.carbondata.core.datastore.DataRefNode; +import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk; +import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk; +import org.apache.carbondata.core.datastorage.FileHolder; + +/** + * Block chunk holder which will hold the dimension and + * measure chunk + */ +public class BlocksChunkHolder { + + /** + * dimension column data chunk + */ + private DimensionColumnDataChunk[] dimensionDataChunk; + + /** + * measure column data chunk + */ + private MeasureColumnDataChunk[] measureDataChunk; + + /** + * file reader which will use to read the block from file + */ + private FileHolder fileReader; + + /** + * data block + */ + private DataRefNode dataBlock; + + public BlocksChunkHolder(int numberOfDimensionBlock, int numberOfMeasureBlock) { + dimensionDataChunk = new DimensionColumnDataChunk[numberOfDimensionBlock]; + measureDataChunk = new MeasureColumnDataChunk[numberOfMeasureBlock]; + } + + /** + * @return the dimensionDataChunk + */ + public DimensionColumnDataChunk[] getDimensionDataChunk() { + return dimensionDataChunk; + } + + /** + * @param dimensionDataChunk the dimensionDataChunk to set + */ + public void setDimensionDataChunk(DimensionColumnDataChunk[] dimensionDataChunk) { + this.dimensionDataChunk = dimensionDataChunk; + } + + /** + * @return the measureDataChunk + */ + public MeasureColumnDataChunk[] getMeasureDataChunk() { + return measureDataChunk; + } + + /** + * @param measureDataChunk the measureDataChunk to set + */ + public void setMeasureDataChunk(MeasureColumnDataChunk[] measureDataChunk) { + this.measureDataChunk = measureDataChunk; + } + + /** + * @return the fileReader + */ + public FileHolder getFileReader() { + return fileReader; + } + + /** + * @param fileReader the fileReader to set + */ + public void setFileReader(FileHolder fileReader) { + this.fileReader = fileReader; + } + + /** + * @return the dataBlock + */ + public DataRefNode getDataBlock() { + return dataBlock; + } + + /** + * @param dataBlock the dataBlock to set + */ + public void setDataBlock(DataRefNode dataBlock) { + this.dataBlock = dataBlock; + } + + /*** + * To reset the measure chunk and dimension chunk + * array + */ + public void reset() { + for (int i = 0; i < measureDataChunk.length; i++) { + this.measureDataChunk[i] = null; + } + for (int i = 0; i < dimensionDataChunk.length; i++) { + this.dimensionDataChunk[i] = null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java new file mode 100644 index 0000000..4de9685 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.processor.impl; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.core.stats.QueryStatisticsModel; +import org.apache.carbondata.core.datastorage.FileHolder; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.processor.AbstractDataBlockIterator; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; + +/** + * Below class will be used to process the block for detail query + */ +public class DataBlockIteratorImpl extends AbstractDataBlockIterator { + /** + * DataBlockIteratorImpl Constructor + * + * @param blockExecutionInfo execution information + */ + public DataBlockIteratorImpl(BlockExecutionInfo blockExecutionInfo, FileHolder fileReader, + int batchSize, QueryStatisticsModel queryStatisticsModel) { + super(blockExecutionInfo, fileReader, batchSize, queryStatisticsModel); + } + + /** + * It scans the block and returns the result with @batchSize + * + * @return Result of @batchSize + */ + public List<Object[]> next() { + List<Object[]> collectedResult = null; + if (updateScanner()) { + collectedResult = this.scannerResultAggregator.collectData(scannedResult, batchSize); + while (collectedResult.size() < batchSize && updateScanner()) { + List<Object[]> data = this.scannerResultAggregator + .collectData(scannedResult, batchSize - collectedResult.size()); + collectedResult.addAll(data); + } + } else { + collectedResult = new ArrayList<>(); + } + return collectedResult; + } + + public void processNextBatch(CarbonColumnarBatch columnarBatch) { + if (updateScanner()) { + this.scannerResultAggregator.collectVectorBatch(scannedResult, columnarBatch); + while (columnarBatch.getActualSize() < columnarBatch.getBatchSize() && updateScanner()) { + this.scannerResultAggregator.collectVectorBatch(scannedResult, columnarBatch); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java new file mode 100644 index 0000000..82649b4 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java @@ -0,0 +1,570 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.result; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.Map; + +import org.apache.carbondata.core.update.data.BlockletLevelDeleteDeltaDataCache; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk; +import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk; +import org.apache.carbondata.core.path.CarbonTablePath; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; +import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; + +/** + * Scanned result class which will store and provide the result on request + */ +public abstract class AbstractScannedResult { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(AbstractScannedResult.class.getName()); + /** + * current row number + */ + protected int currentRow = -1; + /** + * row mapping indexes + */ + protected int[] rowMapping; + /** + * key size of the fixed length column + */ + private int fixedLengthKeySize; + /** + * total number of rows + */ + private int totalNumberOfRows; + /** + * to keep track of number of rows process + */ + protected int rowCounter; + /** + * dimension column data chunk + */ + protected DimensionColumnDataChunk[] dataChunks; + /** + * measure column data chunk + */ + protected MeasureColumnDataChunk[] measureDataChunks; + /** + * dictionary column block index in file + */ + protected int[] dictionaryColumnBlockIndexes; + + /** + * no dictionary column block index in file + */ + protected int[] noDictionaryColumnBlockIndexes; + + /** + * column group to is key structure info + * which will be used to get the key from the complete + * column group key + * For example if only one dimension of the column group is selected + * then from complete column group key it will be used to mask the key and + * get the particular column key + */ + protected Map<Integer, KeyStructureInfo> columnGroupKeyStructureInfo; + + /** + * + */ + private Map<Integer, GenericQueryType> complexParentIndexToQueryMap; + + private int totalDimensionsSize; + + /** + * blockedId which will be blockId + blocklet number in the block + */ + private String blockletId; + + private long rowId; + + /** + * parent block indexes + */ + private int[] complexParentBlockIndexes; + + protected BlockletLevelDeleteDeltaDataCache blockletDeleteDeltaCache; + + public AbstractScannedResult(BlockExecutionInfo blockExecutionInfo) { + this.fixedLengthKeySize = blockExecutionInfo.getFixedLengthKeySize(); + this.noDictionaryColumnBlockIndexes = blockExecutionInfo.getNoDictionaryBlockIndexes(); + this.dictionaryColumnBlockIndexes = blockExecutionInfo.getDictionaryColumnBlockIndex(); + this.columnGroupKeyStructureInfo = blockExecutionInfo.getColumnGroupToKeyStructureInfo(); + this.complexParentIndexToQueryMap = blockExecutionInfo.getComlexDimensionInfoMap(); + this.complexParentBlockIndexes = blockExecutionInfo.getComplexColumnParentBlockIndexes(); + this.totalDimensionsSize = blockExecutionInfo.getQueryDimensions().length; + } + + /** + * Below method will be used to set the dimension chunks + * which will be used to create a row + * + * @param dataChunks dimension chunks used in query + */ + public void setDimensionChunks(DimensionColumnDataChunk[] dataChunks) { + this.dataChunks = dataChunks; + } + + /** + * Below method will be used to set the measure column chunks + * + * @param measureDataChunks measure data chunks + */ + public void setMeasureChunks(MeasureColumnDataChunk[] measureDataChunks) { + this.measureDataChunks = measureDataChunks; + } + + /** + * Below method will be used to get the chunk based in measure ordinal + * + * @param ordinal measure ordinal + * @return measure column chunk + */ + public MeasureColumnDataChunk getMeasureChunk(int ordinal) { + return measureDataChunks[ordinal]; + } + + /** + * Below method will be used to get the key for all the dictionary dimensions + * which is present in the query + * + * @param rowId row id selected after scanning + * @return return the dictionary key + */ + protected byte[] getDictionaryKeyArray(int rowId) { + byte[] completeKey = new byte[fixedLengthKeySize]; + int offset = 0; + for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) { + offset += dataChunks[dictionaryColumnBlockIndexes[i]] + .fillChunkData(completeKey, offset, rowId, + columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i])); + } + rowCounter++; + return completeKey; + } + + /** + * Below method will be used to get the key for all the dictionary dimensions + * in integer array format which is present in the query + * + * @param rowId row id selected after scanning + * @return return the dictionary key + */ + protected int[] getDictionaryKeyIntegerArray(int rowId) { + int[] completeKey = new int[totalDimensionsSize]; + int column = 0; + for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) { + column = dataChunks[dictionaryColumnBlockIndexes[i]] + .fillConvertedChunkData(rowId, column, completeKey, + columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i])); + } + rowCounter++; + return completeKey; + } + + /** + * Fill the column data of dictionary to vector + */ + public void fillColumnarDictionaryBatch(ColumnVectorInfo[] vectorInfo) { + int column = 0; + for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) { + column = dataChunks[dictionaryColumnBlockIndexes[i]] + .fillConvertedChunkData(vectorInfo, column, + columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i])); + } + } + + /** + * Fill the column data to vector + */ + public void fillColumnarNoDictionaryBatch(ColumnVectorInfo[] vectorInfo) { + int column = 0; + for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) { + column = dataChunks[noDictionaryColumnBlockIndexes[i]] + .fillConvertedChunkData(vectorInfo, column, + columnGroupKeyStructureInfo.get(noDictionaryColumnBlockIndexes[i])); + } + } + + /** + * Fill the measure column data to vector + */ + public void fillColumnarMeasureBatch(ColumnVectorInfo[] vectorInfo, int[] measuresOrdinal) { + for (int i = 0; i < measuresOrdinal.length; i++) { + vectorInfo[i].measureVectorFiller + .fillMeasureVector(measureDataChunks[measuresOrdinal[i]], vectorInfo[i]); + } + } + + public void fillColumnarComplexBatch(ColumnVectorInfo[] vectorInfos) { + for (int i = 0; i < vectorInfos.length; i++) { + int offset = vectorInfos[i].offset; + int len = offset + vectorInfos[i].size; + int vectorOffset = vectorInfos[i].vectorOffset; + CarbonColumnVector vector = vectorInfos[i].vector; + for (int j = offset; j < len; j++) { + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + DataOutputStream dataOutput = new DataOutputStream(byteStream); + try { + vectorInfos[i].genericQueryType.parseBlocksAndReturnComplexColumnByteArray(dataChunks, + rowMapping == null ? j : rowMapping[j], dataOutput); + Object data = vectorInfos[i].genericQueryType + .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(byteStream.toByteArray())); + vector.putObject(vectorOffset++, data); + } catch (IOException e) { + LOGGER.error(e); + } finally { + CarbonUtil.closeStreams(dataOutput); + CarbonUtil.closeStreams(byteStream); + } + } + } + } + + /** + * Just increment the counter incase of query only on measures. + */ + public void incrementCounter() { + rowCounter++; + currentRow++; + } + + /** + * increment the counter. + */ + public void setRowCounter(int rowCounter) { + this.rowCounter = rowCounter; + } + + /** + * Below method will be used to get the dimension data based on dimension + * ordinal and index + * + * @param dimOrdinal dimension ordinal present in the query + * @param rowId row index + * @return dimension data based on row id + */ + protected byte[] getDimensionData(int dimOrdinal, int rowId) { + return dataChunks[dimOrdinal].getChunkData(rowId); + } + + /** + * Below method will be used to get the dimension key array + * for all the no dictionary dimension present in the query + * + * @param rowId row number + * @return no dictionary keys for all no dictionary dimension + */ + protected byte[][] getNoDictionaryKeyArray(int rowId) { + byte[][] noDictionaryColumnsKeys = new byte[noDictionaryColumnBlockIndexes.length][]; + int position = 0; + for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) { + noDictionaryColumnsKeys[position++] = + dataChunks[noDictionaryColumnBlockIndexes[i]].getChunkData(rowId); + } + return noDictionaryColumnsKeys; + } + + /** + * Below method will be used to get the dimension key array + * for all the no dictionary dimension present in the query + * + * @param rowId row number + * @return no dictionary keys for all no dictionary dimension + */ + protected String[] getNoDictionaryKeyStringArray(int rowId) { + String[] noDictionaryColumnsKeys = new String[noDictionaryColumnBlockIndexes.length]; + int position = 0; + for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) { + noDictionaryColumnsKeys[position++] = + new String(dataChunks[noDictionaryColumnBlockIndexes[i]].getChunkData(rowId)); + } + return noDictionaryColumnsKeys; + } + + /** + * @return blockletId + */ + public String getBlockletId() { + return blockletId; + } + + /** + * @param blockletId + */ + public void setBlockletId(String blockletId) { + this.blockletId = CarbonTablePath.getShortBlockId(blockletId); + } + + /** + * @return blockletId + */ + public long getRowId() { + return rowId; + } + + /** + * @param blockletId + */ + public void setRowId(long rowId) { + this.rowId = rowId; + } + + + /** + * Below method will be used to get the complex type keys array based + * on row id for all the complex type dimension selected in query + * + * @param rowId row number + * @return complex type key array for all the complex dimension selected in query + */ + protected byte[][] getComplexTypeKeyArray(int rowId) { + byte[][] complexTypeData = new byte[complexParentBlockIndexes.length][]; + for (int i = 0; i < complexTypeData.length; i++) { + GenericQueryType genericQueryType = + complexParentIndexToQueryMap.get(complexParentBlockIndexes[i]); + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + DataOutputStream dataOutput = new DataOutputStream(byteStream); + try { + genericQueryType.parseBlocksAndReturnComplexColumnByteArray(dataChunks, rowId, dataOutput); + complexTypeData[i] = byteStream.toByteArray(); + } catch (IOException e) { + LOGGER.error(e); + } finally { + CarbonUtil.closeStreams(dataOutput); + CarbonUtil.closeStreams(byteStream); + } + } + return complexTypeData; + } + + /** + * @return return the total number of row after scanning + */ + public int numberOfOutputRows() { + return this.totalNumberOfRows; + } + + /** + * to check whether any more row is present in the result + * + * @return + */ + public boolean hasNext() { + if (rowCounter < this.totalNumberOfRows) { + return true; + } + CarbonUtil.freeMemory(dataChunks, measureDataChunks); + return false; + } + + /** + * As this class will be a flyweight object so + * for one block all the blocklet scanning will use same result object + * in that case we need to reset the counter to zero so + * for new result it will give the result from zero + */ + public void reset() { + rowCounter = 0; + currentRow = -1; + } + + /** + * @param totalNumberOfRows set total of number rows valid after scanning + */ + public void setNumberOfRows(int totalNumberOfRows) { + this.totalNumberOfRows = totalNumberOfRows; + } + + /** + * After applying filter it will return the bit set with the valid row indexes + * so below method will be used to set the row indexes + * + * @param indexes + */ + public void setIndexes(int[] indexes) { + this.rowMapping = indexes; + } + + /** + * Below method will be used to check whether measure value is null or not + * + * @param ordinal measure ordinal + * @param rowIndex row number to be checked + * @return whether it is null or not + */ + protected boolean isNullMeasureValue(int ordinal, int rowIndex) { + return measureDataChunks[ordinal].getNullValueIndexHolder().getBitSet().get(rowIndex); + } + + /** + * Below method will be used to get the measure value of + * long type + * + * @param ordinal measure ordinal + * @param rowIndex row number of the measure value + * @return measure value of long type + */ + protected long getLongMeasureValue(int ordinal, int rowIndex) { + return measureDataChunks[ordinal].getMeasureDataHolder().getReadableLongValueByIndex(rowIndex); + } + + /** + * Below method will be used to get the measure value of double type + * + * @param ordinal measure ordinal + * @param rowIndex row number + * @return measure value of double type + */ + protected double getDoubleMeasureValue(int ordinal, int rowIndex) { + return measureDataChunks[ordinal].getMeasureDataHolder() + .getReadableDoubleValueByIndex(rowIndex); + } + + /** + * Below method will be used to get the measure type of big decimal data type + * + * @param ordinal ordinal of the of the measure + * @param rowIndex row number + * @return measure of big decimal type + */ + protected BigDecimal getBigDecimalMeasureValue(int ordinal, int rowIndex) { + return measureDataChunks[ordinal].getMeasureDataHolder() + .getReadableBigDecimalValueByIndex(rowIndex); + } + + public int getRowCounter() { + return rowCounter; + } + + /** + * will return the current valid row id + * + * @return valid row id + */ + public abstract int getCurrenrRowId(); + + /** + * @return dictionary key array for all the dictionary dimension + * selected in query + */ + public abstract byte[] getDictionaryKeyArray(); + + /** + * @return dictionary key array for all the dictionary dimension in integer array forat + * selected in query + */ + public abstract int[] getDictionaryKeyIntegerArray(); + + /** + * Return the dimension data based on dimension ordinal + * + * @param dimensionOrdinal dimension ordinal + * @return dimension data + */ + public abstract byte[] getDimensionKey(int dimensionOrdinal); + + /** + * Below method will be used to get the complex type key array + * + * @return complex type key array + */ + public abstract byte[][] getComplexTypeKeyArray(); + + /** + * Below method will be used to get the no dictionary key + * array for all the no dictionary dimension selected in query + * + * @return no dictionary key array for all the no dictionary dimension + */ + public abstract byte[][] getNoDictionaryKeyArray(); + + /** + * Below method will be used to get the no dictionary key + * array in string array format for all the no dictionary dimension selected in query + * + * @return no dictionary key array for all the no dictionary dimension + */ + public abstract String[] getNoDictionaryKeyStringArray(); + + /** + * Below method will be used to to check whether measure value + * is null or for a measure + * + * @param ordinal measure ordinal + * @return is null or not + */ + public abstract boolean isNullMeasureValue(int ordinal); + + /** + * Below method will be used to get the measure value for measure + * of long data type + * + * @param ordinal measure ordinal + * @return long value of measure + */ + public abstract long getLongMeasureValue(int ordinal); + + /** + * Below method will be used to get the value of measure of double + * type + * + * @param ordinal measure ordinal + * @return measure value + */ + public abstract double getDoubleMeasureValue(int ordinal); + + /** + * Below method will be used to get the data of big decimal type + * of a measure + * + * @param ordinal measure ordinal + * @return measure value + */ + public abstract BigDecimal getBigDecimalMeasureValue(int ordinal); + + /** + * + * @return BlockletLevelDeleteDeltaDataCache. + */ + public BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache() { + return blockletDeleteDeltaCache; + } + + /** + * @param blockletDeleteDeltaCache + */ + public void setBlockletDeleteDeltaCache( + BlockletLevelDeleteDeltaDataCache blockletDeleteDeltaCache) { + this.blockletDeleteDeltaCache = blockletDeleteDeltaCache; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/result/BatchResult.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/BatchResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/BatchResult.java new file mode 100644 index 0000000..1c805ae --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/BatchResult.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.scan.result; + +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; + +import org.apache.carbondata.common.CarbonIterator; + +/** + * Below class holds the query result + */ +public class BatchResult extends CarbonIterator<Object[]> { + + /** + * list of keys + */ + protected List<Object[]> rows; + + /** + * counter to check whether all the records are processed or not + */ + protected int counter; + + public BatchResult() { + this.rows = new ArrayList<>(); + } + + /** + * Below method will be used to get the rows + * + * @return + */ + public List<Object[]> getRows() { + return rows; + } + + /** + * Below method will be used to get the set the values + * + * @param rows + */ + public void setRows(List<Object[]> rows) { + this.rows = rows; + } + + /** + * This method will return one row at a time based on the counter given. + * @param counter + * @return + */ + public Object[] getRawRow(int counter) { + return rows.get(counter); + } + + /** + * For getting the total size. + * @return + */ + public int getSize() { + return rows.size(); + } + + + /** + * Returns {@code true} if the iteration has more elements. + * + * @return {@code true} if the iteration has more elements + */ + @Override public boolean hasNext() { + return counter < rows.size(); + } + + /** + * Returns the next element in the iteration. + * + * @return the next element in the iteration + */ + @Override public Object[] next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + Object[] row = rows.get(counter); + counter++; + return row; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java new file mode 100644 index 0000000..7fc964b --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.result.impl; + +import java.math.BigDecimal; + +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.result.AbstractScannedResult; +import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; + +/** + * Result provider class in case of filter query + * In case of filter query data will be send + * based on filtered row index + */ +public class FilterQueryScannedResult extends AbstractScannedResult { + + public FilterQueryScannedResult(BlockExecutionInfo tableBlockExecutionInfos) { + super(tableBlockExecutionInfos); + } + + /** + * @return dictionary key array for all the dictionary dimension + * selected in query + */ + @Override public byte[] getDictionaryKeyArray() { + ++currentRow; + return getDictionaryKeyArray(rowMapping[currentRow]); + } + + /** + * @return dictionary key integer array for all the dictionary dimension + * selected in query + */ + @Override public int[] getDictionaryKeyIntegerArray() { + ++currentRow; + return getDictionaryKeyIntegerArray(rowMapping[currentRow]); + } + + /** + * Below method will be used to get the complex type key array + * + * @return complex type key array + */ + @Override public byte[][] getComplexTypeKeyArray() { + return getComplexTypeKeyArray(rowMapping[currentRow]); + } + + /** + * Below method will be used to get the no dictionary key + * array for all the no dictionary dimension selected in query + * + * @return no dictionary key array for all the no dictionary dimension + */ + @Override public byte[][] getNoDictionaryKeyArray() { + return getNoDictionaryKeyArray(rowMapping[currentRow]); + } + + /** + * Below method will be used to get the no dictionary key + * string array for all the no dictionary dimension selected in query + * + * @return no dictionary key array for all the no dictionary dimension + */ + @Override public String[] getNoDictionaryKeyStringArray() { + return getNoDictionaryKeyStringArray(rowMapping[currentRow]); + } + + /** + * will return the current valid row id + * + * @return valid row id + */ + @Override public int getCurrenrRowId() { + return rowMapping[currentRow]; + } + + /** + * Return the dimension data based on dimension ordinal + * + * @param dimensionOrdinal dimension ordinal + * @return dimension data + */ + @Override public byte[] getDimensionKey(int dimensionOrdinal) { + return getDimensionData(dimensionOrdinal, rowMapping[currentRow]); + } + + /** + * Below method will be used to to check whether measure value + * is null or for a measure + * + * @param ordinal measure ordinal + * @return is null or not + */ + @Override public boolean isNullMeasureValue(int ordinal) { + return isNullMeasureValue(ordinal, rowMapping[currentRow]); + } + + /** + * Below method will be used to get the measure value for measure + * of long data type + * + * @param ordinal measure ordinal + * @return long value of measure + */ + @Override public long getLongMeasureValue(int ordinal) { + return getLongMeasureValue(ordinal, rowMapping[currentRow]); + } + + /** + * Below method will be used to get the value of measure of double + * type + * + * @param ordinal measure ordinal + * @return measure value + */ + @Override public double getDoubleMeasureValue(int ordinal) { + return getDoubleMeasureValue(ordinal, rowMapping[currentRow]); + } + + /** + * Below method will be used to get the data of big decimal type + * of a measure + * + * @param ordinal measure ordinal + * @return measure value + */ + @Override public BigDecimal getBigDecimalMeasureValue(int ordinal) { + return getBigDecimalMeasureValue(ordinal, rowMapping[currentRow]); + } + + /** + * Fill the column data to vector + */ + public void fillColumnarDictionaryBatch(ColumnVectorInfo[] vectorInfo) { + int column = 0; + for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) { + column = dataChunks[dictionaryColumnBlockIndexes[i]] + .fillConvertedChunkData(rowMapping, vectorInfo, column, + columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i])); + } + } + + /** + * Fill the column data to vector + */ + public void fillColumnarNoDictionaryBatch(ColumnVectorInfo[] vectorInfo) { + int column = 0; + for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) { + column = dataChunks[noDictionaryColumnBlockIndexes[i]] + .fillConvertedChunkData(rowMapping, vectorInfo, column, + columnGroupKeyStructureInfo.get(noDictionaryColumnBlockIndexes[i])); + } + } + + /** + * Fill the measure column data to vector + */ + public void fillColumnarMeasureBatch(ColumnVectorInfo[] vectorInfo, int[] measuresOrdinal) { + for (int i = 0; i < measuresOrdinal.length; i++) { + vectorInfo[i].measureVectorFiller + .fillMeasureVectorForFilter(rowMapping, measureDataChunks[measuresOrdinal[i]], + vectorInfo[i]); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java new file mode 100644 index 0000000..5cfea25 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.result.impl; + +import java.math.BigDecimal; + +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.result.AbstractScannedResult; + +/** + * Result provide class for non filter query + * In case of no filter query we need to return + * complete data + */ +public class NonFilterQueryScannedResult extends AbstractScannedResult { + + public NonFilterQueryScannedResult(BlockExecutionInfo blockExecutionInfo) { + super(blockExecutionInfo); + } + + /** + * @return dictionary key array for all the dictionary dimension selected in + * query + */ + @Override public byte[] getDictionaryKeyArray() { + ++currentRow; + return getDictionaryKeyArray(currentRow); + } + + /** + * @return dictionary key integer array for all the dictionary dimension + * selected in query + */ + @Override public int[] getDictionaryKeyIntegerArray() { + ++currentRow; + return getDictionaryKeyIntegerArray(currentRow); + } + + /** + * Below method will be used to get the complex type key array + * + * @return complex type key array + */ + @Override public byte[][] getComplexTypeKeyArray() { + return getComplexTypeKeyArray(currentRow); + } + + /** + * Below method will be used to get the no dictionary key array for all the + * no dictionary dimension selected in query + * + * @return no dictionary key array for all the no dictionary dimension + */ + @Override public byte[][] getNoDictionaryKeyArray() { + return getNoDictionaryKeyArray(currentRow); + } + + /** + * Below method will be used to get the no dictionary key + * string array for all the no dictionary dimension selected in query + * + * @return no dictionary key array for all the no dictionary dimension + */ + @Override public String[] getNoDictionaryKeyStringArray() { + return getNoDictionaryKeyStringArray(currentRow); + } + + /** + * will return the current valid row id + * + * @return valid row id + */ + @Override public int getCurrenrRowId() { + return currentRow; + } + + /** + * Return the dimension data based on dimension ordinal + * + * @param dimensionOrdinal dimension ordinal + * @return dimension data + */ + @Override public byte[] getDimensionKey(int dimensionOrdinal) { + return getDimensionData(dimensionOrdinal, currentRow); + } + + /** + * Below method will be used to to check whether measure value is null or + * for a measure + * + * @param ordinal measure ordinal + * @return is null or not + */ + @Override public boolean isNullMeasureValue(int ordinal) { + return isNullMeasureValue(ordinal, currentRow); + } + + /** + * Below method will be used to get the measure value for measure of long + * data type + * + * @param ordinal measure ordinal + * @return long value of measure + */ + @Override public long getLongMeasureValue(int ordinal) { + return getLongMeasureValue(ordinal, currentRow); + } + + /** + * Below method will be used to get the value of measure of double type + * + * @param ordinal measure ordinal + * @return measure value + */ + @Override public double getDoubleMeasureValue(int ordinal) { + return getDoubleMeasureValue(ordinal, currentRow); + } + + /** + * Below method will be used to get the data of big decimal type of a + * measure + * + * @param ordinal measure ordinal + * @return measure value + */ + @Override public BigDecimal getBigDecimalMeasureValue(int ordinal) { + return getBigDecimalMeasureValue(ordinal, currentRow); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java new file mode 100644 index 0000000..88c0144 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.scan.result.iterator; + +import java.util.List; +import java.util.concurrent.ExecutorService; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.DataRefNode; +import org.apache.carbondata.core.datastore.DataRefNodeFinder; +import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder; +import org.apache.carbondata.core.stats.QueryStatistic; +import org.apache.carbondata.core.stats.QueryStatisticsConstants; +import org.apache.carbondata.core.stats.QueryStatisticsModel; +import org.apache.carbondata.core.stats.QueryStatisticsRecorder; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastorage.FileHolder; +import org.apache.carbondata.core.datastorage.impl.FileFactory; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.processor.AbstractDataBlockIterator; +import org.apache.carbondata.core.scan.processor.impl.DataBlockIteratorImpl; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; + +/** + * In case of detail query we cannot keep all the records in memory so for + * executing that query are returning a iterator over block and every time next + * call will come it will execute the block and return the result + */ +public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterator<E> { + + /** + * LOGGER. + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(AbstractDetailQueryResultIterator.class.getName()); + + protected ExecutorService execService; + /** + * execution info of the block + */ + protected List<BlockExecutionInfo> blockExecutionInfos; + + /** + * file reader which will be used to execute the query + */ + protected FileHolder fileReader; + protected AbstractDataBlockIterator dataBlockIterator; + protected boolean nextBatch = false; + /** + * total time scan the blocks + */ + protected long totalScanTime; + /** + * is the statistic recorded + */ + protected boolean isStatisticsRecorded; + /** + * QueryStatisticsRecorder + */ + protected QueryStatisticsRecorder recorder; + /** + * number of cores which can be used + */ + private int batchSize; + /** + * queryStatisticsModel to store query statistics object + */ + QueryStatisticsModel queryStatisticsModel; + + public AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel, + ExecutorService execService) { + String batchSizeString = + CarbonProperties.getInstance().getProperty(CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE); + if (null != batchSizeString) { + try { + batchSize = Integer.parseInt(batchSizeString); + } catch (NumberFormatException ne) { + LOGGER.error("Invalid inmemory records size. Using default value"); + batchSize = CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT; + } + } else { + batchSize = CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT; + } + this.recorder = queryModel.getStatisticsRecorder(); + this.blockExecutionInfos = infos; + this.fileReader = FileFactory.getFileHolder( + FileFactory.getFileType(queryModel.getAbsoluteTableIdentifier().getStorePath())); + this.execService = execService; + intialiseInfos(); + initQueryStatiticsModel(); + } + + private void intialiseInfos() { + totalScanTime = System.currentTimeMillis(); + for (BlockExecutionInfo blockInfo : blockExecutionInfos) { + DataRefNodeFinder finder = new BTreeDataRefNodeFinder(blockInfo.getEachColumnValueSize()); + DataRefNode startDataBlock = finder + .findFirstDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getStartKey()); + while (startDataBlock.nodeNumber() < blockInfo.getStartBlockletIndex()) { + startDataBlock = startDataBlock.getNextDataRefNode(); + } + + long numberOfBlockToScan = blockInfo.getNumberOfBlockletToScan(); + //if number of block is less than 0 then take end block. + if (numberOfBlockToScan <= 0) { + DataRefNode endDataBlock = finder + .findLastDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getEndKey()); + numberOfBlockToScan = endDataBlock.nodeNumber() - startDataBlock.nodeNumber() + 1; + } + blockInfo.setFirstDataBlock(startDataBlock); + blockInfo.setNumberOfBlockToScan(numberOfBlockToScan); + } + } + + @Override public boolean hasNext() { + if ((dataBlockIterator != null && dataBlockIterator.hasNext()) || nextBatch) { + return true; + } else if (blockExecutionInfos.size() > 0) { + return true; + } else { + if (!isStatisticsRecorded) { + QueryStatistic statistic = new QueryStatistic(); + statistic.addFixedTimeStatistic(QueryStatisticsConstants.SCAN_BLOCKS_TIME, + System.currentTimeMillis() - totalScanTime); + recorder.recordStatistics(statistic); + isStatisticsRecorded = true; + } + return false; + } + } + + protected void updateDataBlockIterator() { + if (dataBlockIterator == null || !dataBlockIterator.hasNext()) { + dataBlockIterator = getDataBlockIterator(); + while (dataBlockIterator != null && !dataBlockIterator.hasNext()) { + dataBlockIterator = getDataBlockIterator(); + } + } + } + + private DataBlockIteratorImpl getDataBlockIterator() { + if (blockExecutionInfos.size() > 0) { + BlockExecutionInfo executionInfo = blockExecutionInfos.get(0); + blockExecutionInfos.remove(executionInfo); + queryStatisticsModel.setRecorder(recorder); + return new DataBlockIteratorImpl(executionInfo, fileReader, batchSize, queryStatisticsModel); + } + return null; + } + + protected void initQueryStatiticsModel() { + this.queryStatisticsModel = new QueryStatisticsModel(); + QueryStatistic queryStatisticTotalBlocklet = new QueryStatistic(); + queryStatisticsModel.getStatisticsTypeAndObjMap() + .put(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM, queryStatisticTotalBlocklet); + QueryStatistic queryStatisticValidScanBlocklet = new QueryStatistic(); + queryStatisticsModel.getStatisticsTypeAndObjMap() + .put(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM, queryStatisticValidScanBlocklet); + } + + public void processNextBatch(CarbonColumnarBatch columnarBatch) { + throw new UnsupportedOperationException("Please use VectorDetailQueryResultIterator"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java new file mode 100644 index 0000000..61518b2 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.scan.result.iterator; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.scan.result.BatchResult; + +/** + * Iterator over row result + */ +public class ChunkRowIterator extends CarbonIterator<Object[]> { + + /** + * iterator over chunk result + */ + private CarbonIterator<BatchResult> iterator; + + /** + * currect chunk + */ + private BatchResult currentchunk; + + public ChunkRowIterator(CarbonIterator<BatchResult> iterator) { + this.iterator = iterator; + if (iterator.hasNext()) { + currentchunk = iterator.next(); + } + } + + /** + * Returns {@code true} if the iteration has more elements. (In other words, + * returns {@code true} if {@link #next} would return an element rather than + * throwing an exception.) + * + * @return {@code true} if the iteration has more elements + */ + @Override public boolean hasNext() { + if (null != currentchunk) { + if ((currentchunk.hasNext())) { + return true; + } else if (!currentchunk.hasNext()) { + while (iterator.hasNext()) { + currentchunk = iterator.next(); + if (currentchunk != null && currentchunk.hasNext()) { + return true; + } + } + } + } + return false; + } + + /** + * Returns the next element in the iteration. + * + * @return the next element in the iteration + */ + @Override public Object[] next() { + return currentchunk.next(); + } + +}