Fixed Synchronization issue and improve IUD performance
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/da952e82 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/da952e82 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/da952e82 Branch: refs/heads/branch-1.1 Commit: da952e82b443839e9c8b7fdeebaed092d3232652 Parents: bbf5dc1 Author: kumarvishal <kumarvishal.1...@gmail.com> Authored: Mon Jun 12 16:06:24 2017 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Jun 15 13:32:15 2017 +0530 ---------------------------------------------------------------------- .../core/datastore/block/AbstractIndex.java | 41 ++++++++ .../core/datastore/block/TableBlockInfo.java | 22 +++- .../core/mutate/CarbonUpdateUtil.java | 16 +++ .../core/mutate/DeleteDeltaBlockletDetails.java | 15 +-- .../carbondata/core/mutate/DeleteDeltaVo.java | 60 +++++++++++ .../reader/CarbonDeleteFilesDataReader.java | 47 +++++++++ .../impl/DictionaryBasedResultCollector.java | 11 +- .../collector/impl/RawBasedResultCollector.java | 7 +- ...structureBasedDictionaryResultCollector.java | 7 +- .../RestructureBasedRawResultCollector.java | 7 +- .../executor/impl/AbstractQueryExecutor.java | 9 +- .../scan/executor/infos/BlockExecutionInfo.java | 56 ++++++---- .../scan/executor/infos/DeleteDeltaInfo.java | 82 +++++++++++++++ .../core/scan/result/AbstractScannedResult.java | 61 +++++++---- .../AbstractDetailQueryResultIterator.java | 103 ++++++++++++++++++- .../scan/scanner/AbstractBlockletScanner.java | 9 -- .../core/scan/scanner/impl/FilterScanner.java | 10 -- .../SegmentUpdateStatusManager.java | 29 ++++-- .../datastore/SegmentTaskIndexStoreTest.java | 2 +- .../core/datastore/block/BlockInfoTest.java | 12 +-- .../datastore/block/TableBlockInfoTest.java | 32 +++--- .../core/datastore/block/TableTaskInfoTest.java | 8 +- .../carbondata/core/util/CarbonUtilTest.java | 4 +- .../core/util/DataFileFooterConverterTest.java | 8 +- .../carbondata/hadoop/CarbonInputFormat.java | 11 +- .../carbondata/hadoop/CarbonInputSplit.java | 39 +++++-- .../internal/index/impl/InMemoryBTreeIndex.java | 5 +- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 3 +- .../spark/rdd/CarbonDataRDDFactory.scala | 2 +- .../spark/rdd/CarbonDataRDDFactory.scala | 2 +- .../carbon/datastore/BlockIndexStoreTest.java | 28 ++--- 31 files changed, 574 insertions(+), 174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java index b538dc3..4d0e56d 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java @@ -17,11 +17,13 @@ package org.apache.carbondata.core.datastore.block; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.carbondata.core.cache.Cacheable; import org.apache.carbondata.core.datastore.DataRefNode; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.mutate.DeleteDeltaVo; public abstract class AbstractIndex implements Cacheable { @@ -51,6 +53,16 @@ public abstract class AbstractIndex implements Cacheable { protected long memorySize; /** + * last fetch delete deltaFile timestamp + */ + private long deleteDeltaTimestamp; + + /** + * map of blockletidAndPageId to + * deleted rows + */ + private Map<String, DeleteDeltaVo> deletedRowsMap; + /** * @return the segmentProperties */ public SegmentProperties getSegmentProperties() { @@ -124,4 +136,33 @@ public abstract class AbstractIndex implements Cacheable { public void setMemorySize(long memorySize) { this.memorySize = memorySize; } + + /** + * @return latest deleted delta timestamp + */ + public long getDeleteDeltaTimestamp() { + return deleteDeltaTimestamp; + } + + /** + * set the latest delete delta timestamp + * @param deleteDeltaTimestamp + */ + public void setDeleteDeltaTimestamp(long deleteDeltaTimestamp) { + this.deleteDeltaTimestamp = deleteDeltaTimestamp; + } + + /** + * @return the deleted record for block map + */ + public Map<String, DeleteDeltaVo> getDeletedRowsMap() { + return deletedRowsMap; + } + + /** + * @param deletedRowsMap + */ + public void setDeletedRowsMap(Map<String, DeleteDeltaVo> deletedRowsMap) { + this.deletedRowsMap = deletedRowsMap; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java index 8fbaa4a..44347cf 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java @@ -72,14 +72,20 @@ public class TableBlockInfo implements Distributable, Serializable { private Map<String, String> blockStorageIdMap = new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + /** + * delete delta files path for this block + */ + private String[] deletedDeltaFilePath; + public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations, - long blockLength, ColumnarFormatVersion version) { + long blockLength, ColumnarFormatVersion version, String[] deletedDeltaFilePath) { this.filePath = FileFactory.getUpdatedFilePath(filePath); this.blockOffset = blockOffset; this.segmentId = segmentId; this.locations = locations; this.blockLength = blockLength; this.version = version; + this.deletedDeltaFilePath = deletedDeltaFilePath; } /** @@ -93,8 +99,9 @@ public class TableBlockInfo implements Distributable, Serializable { * @param blockletInfos */ public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations, - long blockLength, BlockletInfos blockletInfos, ColumnarFormatVersion version) { - this(filePath, blockOffset, segmentId, locations, blockLength, version); + long blockLength, BlockletInfos blockletInfos, ColumnarFormatVersion version, + String[] deletedDeltaFilePath) { + this(filePath, blockOffset, segmentId, locations, blockLength, version, deletedDeltaFilePath); this.blockletInfos = blockletInfos; } @@ -112,8 +119,9 @@ public class TableBlockInfo implements Distributable, Serializable { */ public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations, long blockLength, BlockletInfos blockletInfos, ColumnarFormatVersion version, - Map<String, String> blockStorageIdMap) { - this(filePath, blockOffset, segmentId, locations, blockLength, blockletInfos, version); + Map<String, String> blockStorageIdMap, String[] deletedDeltaFilePath) { + this(filePath, blockOffset, segmentId, locations, blockLength, blockletInfos, version, + deletedDeltaFilePath); this.blockStorageIdMap = blockStorageIdMap; } @@ -307,4 +315,8 @@ public class TableBlockInfo implements Distributable, Serializable { public void setBlockStorageIdMap(Map<String, String> blockStorageIdMap) { this.blockStorageIdMap = blockStorageIdMap; } + + public String[] getDeletedDeltaFilePath() { + return deletedDeltaFilePath; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java index fef5905..b5a632f 100644 --- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java @@ -800,4 +800,20 @@ public class CarbonUpdateUtil { } + /** + * Below method will be used to get the latest delete delta file timestamp + * @param deleteDeltaFiles + * @return latest delete delta file time stamp + */ + public static long getLatestDeleteDeltaTimestamp(String[] deleteDeltaFiles) { + long latestTimestamp = 0; + for (int i = 0; i < deleteDeltaFiles.length; i++) { + long convertTimeStampToLong = Long.parseLong( + CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(deleteDeltaFiles[i])); + if (latestTimestamp < convertTimeStampToLong) { + latestTimestamp = convertTimeStampToLong; + } + } + return latestTimestamp; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java index 7df5f22..0f54f3a 100644 --- a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java +++ b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java @@ -21,9 +21,6 @@ import java.io.Serializable; import java.util.Set; import java.util.TreeSet; -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; - /** * This class stores the blocklet details of delete delta file */ @@ -35,12 +32,6 @@ public class DeleteDeltaBlockletDetails implements Serializable { private Set<Integer> deletedRows; - /** - * LOGGER - */ - private static final LogService LOGGER = - LogServiceFactory.getLogService(DeleteDeltaBlockletDetails.class.getName()); - public DeleteDeltaBlockletDetails(String id, Integer pageId) { this.id = id; deletedRows = new TreeSet<Integer>(); @@ -84,7 +75,11 @@ public class DeleteDeltaBlockletDetails implements Serializable { } @Override public int hashCode() { - return id.hashCode(); + return id.hashCode() + pageId.hashCode(); + } + + public String getBlockletKey() { + return this.id + '_' + this.pageId; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java new file mode 100644 index 0000000..d68e4e9 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.mutate; + +import java.util.BitSet; +import java.util.Iterator; +import java.util.Set; + +/** + * Class which keep the information about the rows + * while got deleted + */ +public class DeleteDeltaVo { + + /** + * deleted rows bitset + */ + private BitSet bitSet; + + public DeleteDeltaVo() { + bitSet = new BitSet(); + } + + /** + * Below method will be used to insert the rows + * which are deleted + * + * @param data + */ + public void insertData(Set<Integer> data) { + Iterator<Integer> iterator = data.iterator(); + while (iterator.hasNext()) { + bitSet.set(iterator.next()); + } + } + + /** + * below method will be used to check the row is deleted or not + * + * @param counter + * @return + */ + public boolean containsRow(int counter) { + return bitSet.get(counter); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java index e689566..417ad29 100644 --- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java +++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -35,6 +36,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails; import org.apache.carbondata.core.mutate.DeleteDeltaBlockletDetails; +import org.apache.carbondata.core.mutate.DeleteDeltaVo; import org.apache.carbondata.core.util.CarbonProperties; @@ -120,7 +122,52 @@ public class CarbonDeleteFilesDataReader { } } return pageIdDeleteRowsMap; + } + /** + * Below method will be used to read the delete delta files + * and get the map of blockletid and page id mapping to deleted + * rows + * + * @param deltaFiles delete delta files array + * @return map of blockletid_pageid to deleted rows + */ + public Map<String, DeleteDeltaVo> getDeletedRowsDataVo(String[] deltaFiles) { + List<Future<DeleteDeltaBlockDetails>> taskSubmitList = new ArrayList<>(); + ExecutorService executorService = Executors.newFixedThreadPool(thread_pool_size); + for (final String deltaFile : deltaFiles) { + taskSubmitList.add(executorService.submit(new Callable<DeleteDeltaBlockDetails>() { + @Override public DeleteDeltaBlockDetails call() throws IOException { + CarbonDeleteDeltaFileReaderImpl deltaFileReader = + new CarbonDeleteDeltaFileReaderImpl(deltaFile, FileFactory.getFileType(deltaFile)); + return deltaFileReader.readJson(); + } + })); + } + try { + executorService.shutdown(); + executorService.awaitTermination(30, TimeUnit.MINUTES); + } catch (InterruptedException e) { + LOGGER.error("Error while reading the delete delta files : " + e.getMessage()); + } + Map<String, DeleteDeltaVo> pageIdToBlockLetVo = new HashMap<>(); + List<DeleteDeltaBlockletDetails> blockletDetails = null; + for (int i = 0; i < taskSubmitList.size(); i++) { + try { + blockletDetails = taskSubmitList.get(i).get().getBlockletDetails(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + for (DeleteDeltaBlockletDetails blockletDetail : blockletDetails) { + DeleteDeltaVo deleteDeltaVo = pageIdToBlockLetVo.get(blockletDetail.getBlockletKey()); + if (null == deleteDeltaVo) { + deleteDeltaVo = new DeleteDeltaVo(); + pageIdToBlockLetVo.put(blockletDetail.getBlockletKey(), deleteDeltaVo); + } + deleteDeltaVo.insertData(blockletDetail.getDeletedRows()); + } + } + return pageIdToBlockLetVo; } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java index d4d16d0..dba92ad 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java @@ -17,9 +17,11 @@ package org.apache.carbondata.core.scan.collector.impl; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; -import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; @@ -90,8 +92,6 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect int[] surrogateResult; String[] noDictionaryKeys; byte[][] complexTypeKeyArray; - BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache = - scannedResult.getDeleteDeltaDataCache(); while (scannedResult.hasNext() && rowCounter < batchSize) { Object[] row = new Object[queryDimensions.length + queryMeasures.length]; if (isDimensionExists) { @@ -108,8 +108,7 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect } else { scannedResult.incrementCounter(); } - if (null != deleteDeltaDataCache && deleteDeltaDataCache - .contains(scannedResult.getCurrentRowId(), scannedResult.getCurrentPageCounter())) { + if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) { continue; } fillMeasureData(scannedResult, row); http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java index 478dc8c..3e82257 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java @@ -20,7 +20,6 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; -import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; import org.apache.carbondata.core.scan.model.QueryMeasure; @@ -54,15 +53,11 @@ public class RawBasedResultCollector extends AbstractScannedResultCollector { @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) { List<Object[]> listBasedResult = new ArrayList<>(batchSize); QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getQueryMeasures(); - BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache = - scannedResult.getDeleteDeltaDataCache(); // scan the record and add to list int rowCounter = 0; while (scannedResult.hasNext() && rowCounter < batchSize) { scanResultAndGetData(scannedResult); - if (null != deleteDeltaDataCache && deleteDeltaDataCache - .contains(scannedResult.getCurrentRowId(), - scannedResult.getCurrentPageCounter())) { + if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) { continue; } prepareRow(scannedResult, listBasedResult, queryMeasures); http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java index 4fa1494..8f89760 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache; import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; import org.apache.carbondata.core.scan.filter.GenericQueryType; import org.apache.carbondata.core.scan.result.AbstractScannedResult; @@ -50,8 +49,6 @@ public class RestructureBasedDictionaryResultCollector extends DictionaryBasedRe int[] surrogateResult; String[] noDictionaryKeys; byte[][] complexTypeKeyArray; - BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache = - scannedResult.getDeleteDeltaDataCache(); Map<Integer, GenericQueryType> comlexDimensionInfoMap = tableBlockExecutionInfos.getComlexDimensionInfoMap(); while (scannedResult.hasNext() && rowCounter < batchSize) { @@ -80,9 +77,7 @@ public class RestructureBasedDictionaryResultCollector extends DictionaryBasedRe } else { scannedResult.incrementCounter(); } - if (null != deleteDeltaDataCache && deleteDeltaDataCache - .contains(scannedResult.getCurrentRowId(), - scannedResult.getCurrentPageCounter())) { + if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) { continue; } fillMeasureData(scannedResult, row); http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java index 2de74fa..479a684 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java @@ -21,7 +21,6 @@ import java.util.List; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.keygenerator.KeyGenException; @@ -152,15 +151,11 @@ public class RestructureBasedRawResultCollector extends RawBasedResultCollector @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) { List<Object[]> listBasedResult = new ArrayList<>(batchSize); QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getActualQueryMeasures(); - BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache = - scannedResult.getDeleteDeltaDataCache(); // scan the record and add to list int rowCounter = 0; while (scannedResult.hasNext() && rowCounter < batchSize) { scanResultAndGetData(scannedResult); - if (null != deleteDeltaDataCache && deleteDeltaDataCache - .contains(scannedResult.getCurrentRowId(), - scannedResult.getCurrentPageCounter())) { + if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) { continue; } // re-fill dictionary and no dictionary key arrays for the newly added columns http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java index 2a5c342..ba7530d 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java @@ -193,7 +193,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { getBlockExecutionInfoForBlock(queryModel, queryProperties.dataBlocks.get(i), queryModel.getTableBlockInfos().get(i).getBlockletInfos().getStartBlockletNumber(), queryModel.getTableBlockInfos().get(i).getBlockletInfos().getNumberOfBlockletToScan(), - queryModel.getTableBlockInfos().get(i).getFilePath())); + queryModel.getTableBlockInfos().get(i).getFilePath(), + queryModel.getTableBlockInfos().get(i).getDeletedDeltaFilePath())); } if (null != queryModel.getStatisticsRecorder()) { QueryStatistic queryStatistic = new QueryStatistic(); @@ -214,7 +215,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { * @throws QueryExecutionException any failure during block info creation */ protected BlockExecutionInfo getBlockExecutionInfoForBlock(QueryModel queryModel, - AbstractIndex blockIndex, int startBlockletIndex, int numberOfBlockletToScan, String filePath) + AbstractIndex blockIndex, int startBlockletIndex, int numberOfBlockletToScan, String filePath, + String[] deleteDeltaFiles) throws QueryExecutionException { BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo(); SegmentProperties segmentProperties = blockIndex.getSegmentProperties(); @@ -232,6 +234,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { queryModel.getAbsoluteTableIdentifier().getCarbonTableIdentifier()).getFactDir() .length() + 1; blockExecutionInfo.setBlockId(filePath.substring(tableFactPathLength)); + blockExecutionInfo.setDeleteDeltaFilePath(deleteDeltaFiles); blockExecutionInfo.setStartBlockletIndex(startBlockletIndex); blockExecutionInfo.setNumberOfBlockletToScan(numberOfBlockletToScan); blockExecutionInfo.setQueryDimensions(currentBlockQueryDimensions @@ -360,8 +363,6 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { // setting the no dictionary column block indexes blockExecutionInfo.setNoDictionaryBlockIndexes(ArrayUtils.toPrimitive( noDictionaryColumnBlockIndex.toArray(new Integer[noDictionaryColumnBlockIndex.size()]))); - // setting column id to dictionary mapping - blockExecutionInfo.setColumnIdToDcitionaryMapping(queryProperties.columnToDictionayMapping); // setting each column value size blockExecutionInfo.setEachColumnValueSize(segmentProperties.getEachDimColumnValueSize()); blockExecutionInfo.setComplexColumnParentBlockIndexes( http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java index b294b58..7d08dda 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java @@ -18,12 +18,12 @@ package org.apache.carbondata.core.scan.executor.infos; import java.util.Map; -import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.datastore.DataRefNode; import org.apache.carbondata.core.datastore.IndexKey; import org.apache.carbondata.core.datastore.block.AbstractIndex; import org.apache.carbondata.core.keygenerator.KeyGenerator; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.mutate.DeleteDeltaVo; import org.apache.carbondata.core.scan.filter.GenericQueryType; import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; import org.apache.carbondata.core.scan.model.QueryDimension; @@ -101,12 +101,6 @@ public class BlockExecutionInfo { private int[] projectionListMeasureIndexes; /** - * this will be used to update the older block fixed length keys with the - * new block fixed length key - */ - private KeyStructureInfo keyStructureInfo; - - /** * first block from which query execution will start */ private DataRefNode firstDataBlock; @@ -146,12 +140,6 @@ public class BlockExecutionInfo { private Map<Integer, KeyStructureInfo> columnGroupToKeyStructureInfo; /** - * mapping of dictionary dimension to its dictionary mapping which will be - * used to get the actual data from dictionary for aggregation, sorting - */ - private Map<String, Dictionary> columnIdToDcitionaryMapping; - - /** * filter tree to execute the filter */ private FilterExecuter filterExecuterTree; @@ -230,6 +218,13 @@ public class BlockExecutionInfo { */ private AbsoluteTableIdentifier absoluteTableIdentifier; + /** + * delete delta file path + */ + private String[] deleteDeltaFilePath; + + private Map<String, DeleteDeltaVo> deletedRecordsMap; + public AbsoluteTableIdentifier getAbsoluteTableIdentifier() { return absoluteTableIdentifier; } @@ -484,13 +479,6 @@ public class BlockExecutionInfo { this.columnGroupToKeyStructureInfo = columnGroupToKeyStructureInfo; } - /** - * @param columnIdToDcitionaryMapping the columnIdToDcitionaryMapping to set - */ - public void setColumnIdToDcitionaryMapping(Map<String, Dictionary> columnIdToDcitionaryMapping) { - this.columnIdToDcitionaryMapping = columnIdToDcitionaryMapping; - } - public boolean isRawRecordDetailQuery() { return isRawRecordDetailQuery; } @@ -643,4 +631,32 @@ public class BlockExecutionInfo { this.projectionListMeasureIndexes = projectionListMeasureIndexes; } + /** + * @return delete delta files + */ + public String[] getDeleteDeltaFilePath() { + return deleteDeltaFilePath; + } + + /** + * set the delete delta files + * @param deleteDeltaFilePath + */ + public void setDeleteDeltaFilePath(String[] deleteDeltaFilePath) { + this.deleteDeltaFilePath = deleteDeltaFilePath; + } + + /** + * @return deleted record map + */ + public Map<String, DeleteDeltaVo> getDeletedRecordsMap() { + return deletedRecordsMap; + } + + /** + * @param deletedRecordsMap + */ + public void setDeletedRecordsMap(Map<String, DeleteDeltaVo> deletedRecordsMap) { + this.deletedRecordsMap = deletedRecordsMap; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/DeleteDeltaInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/DeleteDeltaInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/DeleteDeltaInfo.java new file mode 100644 index 0000000..52fa529 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/DeleteDeltaInfo.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.executor.infos; + +import java.util.Arrays; + +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; + +/** + * class to hold information about delete delta files + */ +public class DeleteDeltaInfo { + + /** + * delete delta files + */ + private String[] deleteDeltaFile; + + /** + * latest delete delta file timestamp + */ + private long latestDeleteDeltaFileTimestamp; + + public DeleteDeltaInfo(String[] deleteDeltaFile) { + this.deleteDeltaFile = deleteDeltaFile; + this.latestDeleteDeltaFileTimestamp = + CarbonUpdateUtil.getLatestDeleteDeltaTimestamp(deleteDeltaFile); + } + + public String[] getDeleteDeltaFile() { + return deleteDeltaFile; + } + + public long getLatestDeleteDeltaFileTimestamp() { + return latestDeleteDeltaFileTimestamp; + } + + @Override public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + Arrays.hashCode(deleteDeltaFile); + result = + prime * result + (int) (latestDeleteDeltaFileTimestamp ^ (latestDeleteDeltaFileTimestamp + >>> 32)); + return result; + } + + @Override public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + DeleteDeltaInfo other = (DeleteDeltaInfo) obj; + if (!Arrays.equals(deleteDeltaFile, other.deleteDeltaFile)) { + return false; + } + if (latestDeleteDeltaFileTimestamp != other.latestDeleteDeltaFileTimestamp) { + return false; + } + return true; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java index 1dda1aa..c24b73c 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java @@ -25,11 +25,13 @@ import java.util.Map; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk; import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk; import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.mutate.DeleteDeltaVo; +import org.apache.carbondata.core.mutate.TupleIdEnum; import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo; import org.apache.carbondata.core.scan.filter.GenericQueryType; @@ -125,7 +127,20 @@ public abstract class AbstractScannedResult { */ private int[] complexParentBlockIndexes; - protected BlockletLevelDeleteDeltaDataCache blockletDeleteDeltaCache; + /** + * blockletid+pageumber to deleted reocrd map + */ + private Map<String, DeleteDeltaVo> deletedRecordMap; + + /** + * current page delete delta vo + */ + private DeleteDeltaVo currentDeleteDeltaVo; + + /** + * actual blocklet number + */ + private String blockletNumber; public AbstractScannedResult(BlockExecutionInfo blockExecutionInfo) { this.fixedLengthKeySize = blockExecutionInfo.getFixedLengthKeySize(); @@ -135,6 +150,7 @@ public abstract class AbstractScannedResult { this.complexParentIndexToQueryMap = blockExecutionInfo.getComlexDimensionInfoMap(); this.complexParentBlockIndexes = blockExecutionInfo.getComplexColumnParentBlockIndexes(); this.totalDimensionsSize = blockExecutionInfo.getQueryDimensions().length; + this.deletedRecordMap = blockExecutionInfo.getDeletedRecordsMap(); } /** @@ -393,6 +409,12 @@ public abstract class AbstractScannedResult { */ public void setBlockletId(String blockletId) { this.blockletId = CarbonTablePath.getShortBlockId(blockletId); + blockletNumber = CarbonUpdateUtil.getRequiredFieldFromTID(blockletId, TupleIdEnum.BLOCKLET_ID); + // if deleted recors map is present for this block + // then get the first page deleted vo + if (null != deletedRecordMap) { + currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + '_' + pageCounter); + } } /** @@ -457,6 +479,9 @@ public abstract class AbstractScannedResult { pageCounter++; rowCounter = 0; currentRow = -1; + if (null != deletedRecordMap) { + currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + pageCounter + ""); + } return hasNext(); } return false; @@ -629,21 +654,6 @@ public abstract class AbstractScannedResult { public abstract String[] getNoDictionaryKeyStringArray(); /** - * @return BlockletLevelDeleteDeltaDataCache. - */ - public BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache() { - return blockletDeleteDeltaCache; - } - - /** - * @param blockletDeleteDeltaCache - */ - public void setBlockletDeleteDeltaCache( - BlockletLevelDeleteDeltaDataCache blockletDeleteDeltaCache) { - this.blockletDeleteDeltaCache = blockletDeleteDeltaCache; - } - - /** * Mark the filtered rows in columnar batch. These rows will not be added to vector batches later. * @param columnarBatch * @param startRow @@ -653,11 +663,11 @@ public abstract class AbstractScannedResult { public int markFilteredRows(CarbonColumnarBatch columnarBatch, int startRow, int size, int vectorOffset) { int rowsFiltered = 0; - if (blockletDeleteDeltaCache != null) { + if (currentDeleteDeltaVo != null) { int len = startRow + size; for (int i = startRow; i < len; i++) { int rowId = rowMapping != null ? rowMapping[pageCounter][i] : i; - if (blockletDeleteDeltaCache.contains(rowId, pageCounter)) { + if (currentDeleteDeltaVo.containsRow(rowId)) { columnarBatch.markFiltered(vectorOffset); rowsFiltered++; } @@ -666,4 +676,17 @@ public abstract class AbstractScannedResult { } return rowsFiltered; } + + /** + * Below method will be used to check row got deleted + * + * @param rowId + * @return is present in deleted row + */ + public boolean containsDeletedRow(int rowId) { + if (null != currentDeleteDeltaVo) { + return currentDeleteDeltaVo.containsRow(rowId); + } + return false; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java index a0823af..92e9594 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java @@ -18,6 +18,8 @@ package org.apache.carbondata.core.scan.result.iterator; import java.io.IOException; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import org.apache.carbondata.common.CarbonIterator; @@ -27,9 +29,13 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.DataRefNode; import org.apache.carbondata.core.datastore.DataRefNodeFinder; import org.apache.carbondata.core.datastore.FileHolder; +import org.apache.carbondata.core.datastore.block.AbstractIndex; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder; +import org.apache.carbondata.core.mutate.DeleteDeltaVo; +import org.apache.carbondata.core.reader.CarbonDeleteFilesDataReader; import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.executor.infos.DeleteDeltaInfo; import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.scan.processor.AbstractDataBlockIterator; import org.apache.carbondata.core.scan.processor.impl.DataBlockIteratorImpl; @@ -53,6 +59,9 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato private static final LogService LOGGER = LogServiceFactory.getLogService(AbstractDetailQueryResultIterator.class.getName()); + private static final Map<DeleteDeltaInfo, Object> deleteDeltaToLockObjectMap = + new ConcurrentHashMap<>(); + protected ExecutorService execService; /** * execution info of the block @@ -77,7 +86,7 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato /** * queryStatisticsModel to store query statistics object */ - QueryStatisticsModel queryStatisticsModel; + private QueryStatisticsModel queryStatisticsModel; public AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel, ExecutorService execService) { @@ -105,13 +114,24 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato private void intialiseInfos() { for (BlockExecutionInfo blockInfo : blockExecutionInfos) { - DataRefNodeFinder finder = new BTreeDataRefNodeFinder(blockInfo.getEachColumnValueSize()); + Map<String, DeleteDeltaVo> deletedRowsMap = null; + DataRefNodeFinder finder = new BTreeDataRefNodeFinder(blockInfo.getEachColumnValueSize(), + blockInfo.getDataBlock().getSegmentProperties().getNumberOfSortColumns(), + blockInfo.getDataBlock().getSegmentProperties().getNumberOfNoDictSortColumns()); + // if delete delta file is present + if (null != blockInfo.getDeleteDeltaFilePath() && 0 != blockInfo + .getDeleteDeltaFilePath().length) { + DeleteDeltaInfo deleteDeltaInfo = new DeleteDeltaInfo(blockInfo.getDeleteDeltaFilePath()); + // read and get the delete detail block details + deletedRowsMap = getDeleteDeltaDetails(blockInfo.getDataBlock(), deleteDeltaInfo); + // set the deleted row to block execution info + blockInfo.setDeletedRecordsMap(deletedRowsMap); + } DataRefNode startDataBlock = finder .findFirstDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getStartKey()); while (startDataBlock.nodeNumber() < blockInfo.getStartBlockletIndex()) { startDataBlock = startDataBlock.getNextDataRefNode(); } - long numberOfBlockToScan = blockInfo.getNumberOfBlockletToScan(); //if number of block is less than 0 then take end block. if (numberOfBlockToScan <= 0) { @@ -124,6 +144,83 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato } } + /** + * Below method will be used to get the delete delta rows for a block + * + * @param dataBlock data block + * @param deleteDeltaInfo delete delta info + * @return blockid+pageid to deleted row mapping + */ + private Map<String, DeleteDeltaVo> getDeleteDeltaDetails(AbstractIndex dataBlock, + DeleteDeltaInfo deleteDeltaInfo) { + // if datablock deleted delta timestamp is more then the current delete delta files timestamp + // then return the current deleted rows + if (dataBlock.getDeleteDeltaTimestamp() >= deleteDeltaInfo + .getLatestDeleteDeltaFileTimestamp()) { + return dataBlock.getDeletedRowsMap(); + } + CarbonDeleteFilesDataReader carbonDeleteDeltaFileReader = null; + // get the lock object so in case of concurrent query only one task will read the delete delta + // files other tasks will wait + Object lockObject = deleteDeltaToLockObjectMap.get(deleteDeltaInfo); + // if lock object is null then add a lock object + if (null == lockObject) { + synchronized (deleteDeltaToLockObjectMap) { + // double checking + lockObject = deleteDeltaToLockObjectMap.get(deleteDeltaInfo); + if (null == lockObject) { + lockObject = new Object(); + deleteDeltaToLockObjectMap.put(deleteDeltaInfo, lockObject); + } + } + } + // double checking to check the deleted rows is already present or not + if (dataBlock.getDeleteDeltaTimestamp() < deleteDeltaInfo.getLatestDeleteDeltaFileTimestamp()) { + // if not then acquire the lock + synchronized (lockObject) { + // check the timestamp again + if (dataBlock.getDeleteDeltaTimestamp() < deleteDeltaInfo + .getLatestDeleteDeltaFileTimestamp()) { + // read the delete delta files + carbonDeleteDeltaFileReader = new CarbonDeleteFilesDataReader(); + Map<String, DeleteDeltaVo> deletedRowsMap = carbonDeleteDeltaFileReader + .getDeletedRowsDataVo(deleteDeltaInfo.getDeleteDeltaFile()); + setDeltedDeltaBoToDataBlock(deleteDeltaInfo, deletedRowsMap, dataBlock); + // remove the lock + deleteDeltaToLockObjectMap.remove(deleteDeltaInfo); + return deletedRowsMap; + } else { + return dataBlock.getDeletedRowsMap(); + } + } + } else { + return dataBlock.getDeletedRowsMap(); + } + } + + /** + * Below method will be used to set deleted records map to data block + * based on latest delta file timestamp + * + * @param deleteDeltaInfo + * @param deletedRecordsMap + * @param dataBlock + */ + private void setDeltedDeltaBoToDataBlock(DeleteDeltaInfo deleteDeltaInfo, + Map<String, DeleteDeltaVo> deletedRecordsMap, AbstractIndex dataBlock) { + // check if timestamp of data block is less than the latest delete delta timestamp + // then update the delete delta details and timestamp in data block + if (dataBlock.getDeleteDeltaTimestamp() < deleteDeltaInfo.getLatestDeleteDeltaFileTimestamp()) { + synchronized (dataBlock) { + if (dataBlock.getDeleteDeltaTimestamp() < deleteDeltaInfo + .getLatestDeleteDeltaFileTimestamp()) { + dataBlock.setDeletedRowsMap(deletedRecordsMap); + dataBlock.setDeleteDeltaTimestamp(deleteDeltaInfo.getLatestDeleteDeltaFileTimestamp()); + } + } + } + } + @Override public boolean hasNext() { if ((dataBlockIterator != null && dataBlockIterator.hasNext())) { return true; http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java index 0fb9782..f3d1336 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java @@ -23,8 +23,6 @@ import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk; import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk; import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; -import org.apache.carbondata.core.mutate.data.BlockletDeleteDeltaCacheLoader; -import org.apache.carbondata.core.mutate.data.DeleteDeltaCacheLoaderIntf; import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; import org.apache.carbondata.core.scan.processor.BlocksChunkHolder; @@ -114,13 +112,6 @@ public abstract class AbstractBlockletScanner implements BlockletScanner { } } scannedResult.setNumberOfRows(numberOfRows); - // loading delete data cache in blockexecutioninfo instance - DeleteDeltaCacheLoaderIntf deleteCacheLoader = - new BlockletDeleteDeltaCacheLoader(scannedResult.getBlockletId(), - blocksChunkHolder.getDataBlock(), blockExecutionInfo.getAbsoluteTableIdentifier()); - deleteCacheLoader.loadDeleteDeltaFileDataToCache(); - scannedResult - .setBlockletDeleteDeltaCache(blocksChunkHolder.getDataBlock().getDeleteDeltaDataCache()); scannedResult.setRawColumnChunks(dimensionRawColumnChunks); // adding statistics for carbon scan time QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap() http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java index 8f14b85..e710e40 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java @@ -26,8 +26,6 @@ import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk; import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk; import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; -import org.apache.carbondata.core.mutate.data.BlockletDeleteDeltaCacheLoader; -import org.apache.carbondata.core.mutate.data.DeleteDeltaCacheLoaderIntf; import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; @@ -198,17 +196,9 @@ public class FilterScanner extends AbstractBlockletScanner { indexesGroup[k] = indexes; } } - // loading delete data cache in blockexecutioninfo instance - DeleteDeltaCacheLoaderIntf deleteCacheLoader = - new BlockletDeleteDeltaCacheLoader(scannedResult.getBlockletId(), - blocksChunkHolder.getDataBlock(), blockExecutionInfo.getAbsoluteTableIdentifier()); - deleteCacheLoader.loadDeleteDeltaFileDataToCache(); - scannedResult - .setBlockletDeleteDeltaCache(blocksChunkHolder.getDataBlock().getDeleteDeltaDataCache()); FileHolder fileReader = blocksChunkHolder.getFileReader(); int[][] allSelectedDimensionBlocksIndexes = blockExecutionInfo.getAllSelectedDimensionBlocksIndexes(); - long dimensionReadTime = System.currentTimeMillis(); DimensionRawColumnChunk[] projectionListDimensionChunk = blocksChunkHolder.getDataBlock() .getDimensionChunks(fileReader, allSelectedDimensionBlocksIndexes); http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java index 6fab563..5e6e8de 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java @@ -261,7 +261,22 @@ public class SegmentUpdateStatusManager { return dataReader.getDeleteDataFromAllFiles(deltaFiles, blockletId); } - + /** + * Below method will be used to get all the delete delta files based on block name + * + * @param blockFilePath actual block filePath + * @return all delete delta files + * @throws Exception + */ + public String[] getDeleteDeltaFilePath(String blockFilePath) throws Exception { + int tableFactPathLength = CarbonStorePath + .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()).getFactDir().length() + 1; + String blockame = blockFilePath.substring(tableFactPathLength); + String tupleId = CarbonTablePath.getShortBlockId(blockame); + return getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT) + .toArray(new String[0]); + } /** * Returns all delta file paths of specified block @@ -291,11 +306,8 @@ public class SegmentUpdateStatusManager { //blockName without timestamp final String blockNameFromTuple = blockNameWithoutExtn.substring(0, blockNameWithoutExtn.lastIndexOf("-")); - SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray = - readLoadMetadata(); - return getDeltaFiles(file, blockNameFromTuple, listOfSegmentUpdateDetailsArray, extension, + return getDeltaFiles(file, blockNameFromTuple, extension, segment); - } catch (Exception ex) { String errorMsg = "Invalid tuple id " + tupleId; LOG.error(errorMsg); @@ -345,12 +357,11 @@ public class SegmentUpdateStatusManager { * @param extension * @return */ - public List<String> getDeltaFiles(CarbonFile blockDir, final String blockNameFromTuple, - SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray, + private List<String> getDeltaFiles(CarbonFile blockDir, final String blockNameFromTuple, final String extension, String segment) { - List<String> deleteFileList = null; - for (SegmentUpdateDetails block : listOfSegmentUpdateDetailsArray) { + List<String> deleteFileList = new ArrayList<>(); + for (SegmentUpdateDetails block : updateDetails) { if (block.getBlockName().equalsIgnoreCase(blockNameFromTuple) && block.getSegmentName() .equalsIgnoreCase(segment) && !CarbonUpdateUtil.isBlockInvalid(block.getStatus())) { final long deltaStartTimestamp = getStartTimeOfDeltaFile(extension, block); http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/test/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStoreTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStoreTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStoreTest.java index c66398c..982fb50 100644 --- a/core/src/test/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStoreTest.java +++ b/core/src/test/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStoreTest.java @@ -62,7 +62,7 @@ public class SegmentTaskIndexStoreTest { <TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper> createCache(CacheType.DRIVER_BTREE, ""); tableBlockInfo = new TableBlockInfo("file", 0L, "SG100", locations, 10L, - ColumnarFormatVersion.valueOf(version)); + ColumnarFormatVersion.valueOf(version), null); absoluteTableIdentifier = new AbsoluteTableIdentifier("/tmp", new CarbonTableIdentifier("testdatabase", "testtable", "TB100")); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/test/java/org/apache/carbondata/core/datastore/block/BlockInfoTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/block/BlockInfoTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/block/BlockInfoTest.java index 08c22ec..1b7f106 100644 --- a/core/src/test/java/org/apache/carbondata/core/datastore/block/BlockInfoTest.java +++ b/core/src/test/java/org/apache/carbondata/core/datastore/block/BlockInfoTest.java @@ -27,7 +27,7 @@ public class BlockInfoTest { static BlockInfo blockInfo; @BeforeClass public static void setup() { - blockInfo = new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "segmentId", null, 6, ColumnarFormatVersion.V1)); + blockInfo = new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "segmentId", null, 6, ColumnarFormatVersion.V1, null)); } @Test public void hashCodeTest() { @@ -43,7 +43,7 @@ public class BlockInfoTest { @Test public void equalsTestWithSimilarObject() { BlockInfo blockInfoTest = - new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "segmentId", null, 6, ColumnarFormatVersion.V1)); + new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "segmentId", null, 6, ColumnarFormatVersion.V1, null)); Boolean res = blockInfo.equals(blockInfoTest); assert (res); } @@ -60,28 +60,28 @@ public class BlockInfoTest { @Test public void equalsTestWithDifferentSegmentId() { BlockInfo blockInfoTest = - new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "diffSegmentId", null, 6, ColumnarFormatVersion.V1)); + new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "diffSegmentId", null, 6, ColumnarFormatVersion.V1, null)); Boolean res = blockInfo.equals(blockInfoTest); assert (!res); } @Test public void equalsTestWithDifferentOffset() { BlockInfo blockInfoTest = - new BlockInfo(new TableBlockInfo("/filePath.carbondata", 62, "segmentId", null, 6, ColumnarFormatVersion.V1)); + new BlockInfo(new TableBlockInfo("/filePath.carbondata", 62, "segmentId", null, 6, ColumnarFormatVersion.V1, null)); Boolean res = blockInfo.equals(blockInfoTest); assert (!res); } @Test public void equalsTestWithDifferentBlockLength() { BlockInfo blockInfoTest = - new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "segmentId", null, 62, ColumnarFormatVersion.V1)); + new BlockInfo(new TableBlockInfo("/filePath.carbondata", 6, "segmentId", null, 62, ColumnarFormatVersion.V1, null)); Boolean res = blockInfo.equals(blockInfoTest); assert (!res); } @Test public void equalsTestWithDiffFilePath() { BlockInfo blockInfoTest = - new BlockInfo(new TableBlockInfo("/diffFilePath.carbondata", 6, "segmentId", null, 62, ColumnarFormatVersion.V1)); + new BlockInfo(new TableBlockInfo("/diffFilePath.carbondata", 6, "segmentId", null, 62, ColumnarFormatVersion.V1, null)); Boolean res = blockInfoTest.equals(blockInfo); assert (!res); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java index 840287e..f4553a6 100644 --- a/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java +++ b/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java @@ -33,8 +33,8 @@ public class TableBlockInfoTest { static TableBlockInfo tableBlockInfos; @BeforeClass public static void setup() { - tableBlockInfo = new TableBlockInfo("filePath", 4, "segmentId", null, 6, ColumnarFormatVersion.V1); - tableBlockInfos = new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 2, 2), ColumnarFormatVersion.V1); + tableBlockInfo = new TableBlockInfo("filePath", 4, "segmentId", null, 6, ColumnarFormatVersion.V1, null); + tableBlockInfos = new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 2, 2), ColumnarFormatVersion.V1, null); } @Test public void equalTestWithSameObject() { @@ -43,7 +43,7 @@ public class TableBlockInfoTest { } @Test public void equalTestWithSimilarObject() { - TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "segmentId", null, 6, ColumnarFormatVersion.V1); + TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "segmentId", null, 6, ColumnarFormatVersion.V1, null); Boolean res = tableBlockInfo.equals(tableBlockInfoTest); assert (res); } @@ -59,52 +59,52 @@ public class TableBlockInfoTest { } @Test public void equlsTestWithDiffSegmentId() { - TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "diffsegmentId", null, 6, ColumnarFormatVersion.V1); + TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "diffsegmentId", null, 6, ColumnarFormatVersion.V1, null); Boolean res = tableBlockInfo.equals(tableBlockInfoTest); assert (!res); } @Test public void equlsTestWithDiffBlockOffset() { - TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 6, "segmentId", null, 6, ColumnarFormatVersion.V1); + TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 6, "segmentId", null, 6, ColumnarFormatVersion.V1, null); Boolean res = tableBlockInfo.equals(tableBlockInfoTest); assert (!res); } @Test public void equalsTestWithDiffBlockLength() { - TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "segmentId", null, 4, ColumnarFormatVersion.V1); + TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "segmentId", null, 4, ColumnarFormatVersion.V1, null); Boolean res = tableBlockInfo.equals(tableBlockInfoTest); assert (!res); } @Test public void equalsTestWithDiffBlockletNumber() { TableBlockInfo tableBlockInfoTest = - new TableBlockInfo("filepath", 6, "segmentId", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1); + new TableBlockInfo("filepath", 6, "segmentId", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1, null); Boolean res = tableBlockInfos.equals(tableBlockInfoTest); assert (!res); } @Test public void equalsTestWithDiffFilePath() { TableBlockInfo tableBlockInfoTest = - new TableBlockInfo("difffilepath", 6, "segmentId", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1); + new TableBlockInfo("difffilepath", 6, "segmentId", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1, null); Boolean res = tableBlockInfos.equals(tableBlockInfoTest); assert (!res); } @Test public void compareToTestForSegmentId() { TableBlockInfo tableBlockInfo = - new TableBlockInfo("difffilepath", 6, "5", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1); + new TableBlockInfo("difffilepath", 6, "5", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1, null); int res = tableBlockInfos.compareTo(tableBlockInfo); int expectedResult = 2; assertEquals(res, expectedResult); TableBlockInfo tableBlockInfo1 = - new TableBlockInfo("difffilepath", 6, "6", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1); + new TableBlockInfo("difffilepath", 6, "6", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1, null); int res1 = tableBlockInfos.compareTo(tableBlockInfo1); int expectedResult1 = -1; assertEquals(res1, expectedResult1); TableBlockInfo tableBlockInfo2 = - new TableBlockInfo("difffilepath", 6, "4", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1); + new TableBlockInfo("difffilepath", 6, "4", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1, null); int res2 = tableBlockInfos.compareTo(tableBlockInfo2); int expectedresult2 = 1; assertEquals(res2, expectedresult2); @@ -129,18 +129,18 @@ public class TableBlockInfoTest { }; - TableBlockInfo tableBlockInfo = new TableBlockInfo("difffilepaths", 6, "5", null, 3, ColumnarFormatVersion.V1); + TableBlockInfo tableBlockInfo = new TableBlockInfo("difffilepaths", 6, "5", null, 3, ColumnarFormatVersion.V1, null); int res = tableBlockInfos.compareTo(tableBlockInfo); int expectedResult = 7; assertEquals(res, expectedResult); - TableBlockInfo tableBlockInfo1 = new TableBlockInfo("filepath", 6, "5", null, 3, ColumnarFormatVersion.V1); + TableBlockInfo tableBlockInfo1 = new TableBlockInfo("filepath", 6, "5", null, 3, ColumnarFormatVersion.V1, null); int res1 = tableBlockInfos.compareTo(tableBlockInfo1); int expectedResult1 = 1; assertEquals(res1, expectedResult1); TableBlockInfo tableBlockInfoTest = - new TableBlockInfo("filePath", 6, "5", null, 7, new BlockletInfos(6, 2, 2), ColumnarFormatVersion.V1); + new TableBlockInfo("filePath", 6, "5", null, 7, new BlockletInfos(6, 2, 2), ColumnarFormatVersion.V1, null); int res2 = tableBlockInfos.compareTo(tableBlockInfoTest); int expectedResult2 = -1; assertEquals(res2, expectedResult2); @@ -148,13 +148,13 @@ public class TableBlockInfoTest { @Test public void compareToTestWithStartBlockletNo() { TableBlockInfo tableBlockInfo = - new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1); + new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1, null); int res = tableBlockInfos.compareTo(tableBlockInfo); int expectedresult =-1; assertEquals(res, expectedresult); TableBlockInfo tableBlockInfo1 = - new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 1, 2), ColumnarFormatVersion.V1); + new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 1, 2), ColumnarFormatVersion.V1, null); int res1 = tableBlockInfos.compareTo(tableBlockInfo1); int expectedresult1 = 1; assertEquals(res1, expectedresult1); http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/test/java/org/apache/carbondata/core/datastore/block/TableTaskInfoTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/block/TableTaskInfoTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/block/TableTaskInfoTest.java index 52c56d3..ccc7af6 100644 --- a/core/src/test/java/org/apache/carbondata/core/datastore/block/TableTaskInfoTest.java +++ b/core/src/test/java/org/apache/carbondata/core/datastore/block/TableTaskInfoTest.java @@ -33,10 +33,10 @@ public class TableTaskInfoTest { tableBlockInfoList = new ArrayList<>(5); String[] locations = { "loc1", "loc2", "loc3" }; - tableBlockInfoList.add(0, new TableBlockInfo("filePath", 2, "segmentID", locations, 6, ColumnarFormatVersion.V1)); + tableBlockInfoList.add(0, new TableBlockInfo("filePath", 2, "segmentID", locations, 6, ColumnarFormatVersion.V1, null)); String[] locs = { "loc4", "loc5" }; - tableBlockInfoList.add(1, new TableBlockInfo("filepath", 2, "segmentId", locs, 6, ColumnarFormatVersion.V1)); + tableBlockInfoList.add(1, new TableBlockInfo("filepath", 2, "segmentId", locs, 6, ColumnarFormatVersion.V1, null)); tableTaskInfo = new TableTaskInfo("taskId", tableBlockInfoList); } @@ -67,10 +67,10 @@ public class TableTaskInfoTest { List<TableBlockInfo> tableBlockInfoListTest = new ArrayList<>(); String[] locations = { "loc1", "loc2", "loc3" }; - tableBlockInfoListTest.add(0, new TableBlockInfo("filePath", 2, "segmentID", locations, 6, ColumnarFormatVersion.V1)); + tableBlockInfoListTest.add(0, new TableBlockInfo("filePath", 2, "segmentID", locations, 6, ColumnarFormatVersion.V1, null)); String[] locations1 = { "loc1", "loc2", "loc3" }; - tableBlockInfoListTest.add(1, new TableBlockInfo("filePath", 2, "segmentID", locations1, 6, ColumnarFormatVersion.V1)); + tableBlockInfoListTest.add(1, new TableBlockInfo("filePath", 2, "segmentID", locations1, 6, ColumnarFormatVersion.V1, null)); List<String> res = TableTaskInfo.maxNoNodes(tableBlockInfoListTest); assert (res.equals(locs)); http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java index 9adf4d4..badf63e 100644 --- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java +++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java @@ -516,7 +516,7 @@ public class CarbonUtilTest { } }; TableBlockInfo info = - new TableBlockInfo("file:/", 1, "0", new String[0], 1, ColumnarFormatVersion.V1); + new TableBlockInfo("file:/", 1, "0", new String[0], 1, ColumnarFormatVersion.V1, null); assertEquals(CarbonUtil.readMetadatFile(info).getVersionId().number(), 1); } @@ -525,7 +525,7 @@ public class CarbonUtilTest { public void testToReadMetadatFileWithException() throws Exception { TableBlockInfo info = - new TableBlockInfo("file:/", 1, "0", new String[0], 1, ColumnarFormatVersion.V1); + new TableBlockInfo("file:/", 1, "0", new String[0], 1, ColumnarFormatVersion.V1, null); CarbonUtil.readMetadatFile(info); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java index 83c7fa4..8161fae 100644 --- a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java +++ b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java @@ -142,12 +142,14 @@ public class DataFileFooterConverterTest { } }; String[] arr = { "a", "b", "c" }; - TableBlockInfo tableBlockInfo = new TableBlockInfo("/file.carbondata", 3, "id", arr, 3, ColumnarFormatVersion.V1); + String fileName = "/part-0-0_batchno0-0-1495074251740.carbondata"; + TableBlockInfo tableBlockInfo = new TableBlockInfo(fileName, 3, "id", arr, 3, ColumnarFormatVersion.V1, null); tableBlockInfo.getBlockletInfos().setNoOfBlockLets(3); List<TableBlockInfo> tableBlockInfoList = new ArrayList<>(); tableBlockInfoList.add(tableBlockInfo); + String idxFileName = "0_batchno0-0-1495074251740.carbonindex"; List<DataFileFooter> dataFileFooterList = - dataFileFooterConverter.getIndexInfo("indexfile", tableBlockInfoList); + dataFileFooterConverter.getIndexInfo(idxFileName, tableBlockInfoList); byte[] exp = dataFileFooterList.get(0).getBlockletIndex().getBtreeIndex().getStartKey(); byte[] res = "1".getBytes(); for (int i = 0; i < exp.length; i++) { @@ -244,7 +246,7 @@ public class DataFileFooterConverterTest { segmentInfo.setNumberOfColumns(segmentInfo1.getNum_cols()); dataFileFooter.setNumberOfRows(3); dataFileFooter.setSegmentInfo(segmentInfo); - TableBlockInfo info = new TableBlockInfo("/file.carbondata", 1, "0", new String[0], 1, ColumnarFormatVersion.V1); + TableBlockInfo info = new TableBlockInfo("/file.carbondata", 1, "0", new String[0], 1, ColumnarFormatVersion.V1, null); DataFileFooter result = dataFileFooterConverter.readDataFileFooter(info); assertEquals(result.getNumberOfRows(), 3); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java index cda34e4..5d9bbe7 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java @@ -323,10 +323,17 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { updateStatusManager)) { continue; } + String[] deleteDeltaFilePath = null; + try { + deleteDeltaFilePath = + updateStatusManager.getDeleteDeltaFilePath(tableBlockInfo.getFilePath()); + } catch (Exception e) { + throw new IOException(e); + } result.add(new CarbonInputSplit(segmentNo, new Path(tableBlockInfo.getFilePath()), tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(), tableBlockInfo.getLocations(), tableBlockInfo.getBlockletInfos().getNoOfBlockLets(), - tableBlockInfo.getVersion())); + tableBlockInfo.getVersion(), deleteDeltaFilePath)); } } return result; @@ -429,7 +436,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(), tableSegmentUniqueIdentifier.getSegmentId(), carbonInputSplit.getLocations(), carbonInputSplit.getLength(), blockletInfos, carbonInputSplit.getVersion(), - carbonInputSplit.getBlockStorageIdMap())); + carbonInputSplit.getBlockStorageIdMap(), carbonInputSplit.getDeleteDeltaFiles())); } } return tableBlockInfoList; http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java index 08661a2..631bc2c 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java @@ -72,6 +72,11 @@ public class CarbonInputSplit extends FileSplit private List<UpdateVO> invalidTimestampsList; + /** + * list of delete delta files for split + */ + private String[] deleteDeltaFiles; + public CarbonInputSplit() { segmentId = null; taskId = "0"; @@ -82,7 +87,7 @@ public class CarbonInputSplit extends FileSplit } private CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations, - ColumnarFormatVersion version) { + ColumnarFormatVersion version, String[] deleteDeltaFiles) { super(path, start, length, locations); this.segmentId = segmentId; String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(path.getName()); @@ -93,11 +98,12 @@ public class CarbonInputSplit extends FileSplit this.bucketId = CarbonTablePath.DataFileUtil.getBucketNo(path.getName()); this.invalidSegments = new ArrayList<>(); this.version = version; + this.deleteDeltaFiles = deleteDeltaFiles; } public CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations, - int numberOfBlocklets, ColumnarFormatVersion version) { - this(segmentId, path, start, length, locations, version); + int numberOfBlocklets, ColumnarFormatVersion version, String[] deleteDeltaFiles) { + this(segmentId, path, start, length, locations, version, deleteDeltaFiles); this.numberOfBlocklets = numberOfBlocklets; } @@ -113,8 +119,9 @@ public class CarbonInputSplit extends FileSplit * @param blockStorageIdMap */ public CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations, - int numberOfBlocklets, ColumnarFormatVersion version, Map<String, String> blockStorageIdMap) { - this(segmentId, path, start, length, locations, numberOfBlocklets, version); + int numberOfBlocklets, ColumnarFormatVersion version, Map<String, String> blockStorageIdMap, + String[] deleteDeltaFiles) { + this(segmentId, path, start, length, locations, numberOfBlocklets, version, deleteDeltaFiles); this.blockStorageIdMap = blockStorageIdMap; } @@ -122,7 +129,7 @@ public class CarbonInputSplit extends FileSplit ColumnarFormatVersion version) throws IOException { return new CarbonInputSplit(segmentId, split.getPath(), split.getStart(), split.getLength(), - split.getLocations(), version); + split.getLocations(), version, null); } public static List<TableBlockInfo> createBlocks(List<CarbonInputSplit> splitList) { @@ -133,7 +140,8 @@ public class CarbonInputSplit extends FileSplit try { tableBlockInfoList.add( new TableBlockInfo(split.getPath().toString(), split.getStart(), split.getSegmentId(), - split.getLocations(), split.getLength(), blockletInfos, split.getVersion())); + split.getLocations(), split.getLength(), blockletInfos, split.getVersion(), + split.getDeleteDeltaFiles())); } catch (IOException e) { throw new RuntimeException("fail to get location of split: " + split, e); } @@ -147,7 +155,7 @@ public class CarbonInputSplit extends FileSplit try { return new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(), inputSplit.getSegmentId(), inputSplit.getLocations(), inputSplit.getLength(), - blockletInfos, inputSplit.getVersion()); + blockletInfos, inputSplit.getVersion(), inputSplit.getDeleteDeltaFiles()); } catch (IOException e) { throw new RuntimeException("fail to get location of split: " + inputSplit, e); } @@ -167,6 +175,11 @@ public class CarbonInputSplit extends FileSplit for (int i = 0; i < numInvalidSegment; i++) { invalidSegments.add(in.readUTF()); } + int numberOfDeleteDeltaFiles = in.readInt(); + deleteDeltaFiles = new String[numberOfDeleteDeltaFiles]; + for (int i = 0; i < numberOfDeleteDeltaFiles; i++) { + deleteDeltaFiles[i] = in.readUTF(); + } } @Override public void write(DataOutput out) throws IOException { @@ -178,6 +191,12 @@ public class CarbonInputSplit extends FileSplit for (String invalidSegment : invalidSegments) { out.writeUTF(invalidSegment); } + out.writeInt(null != deleteDeltaFiles ? deleteDeltaFiles.length : 0); + if (null != deleteDeltaFiles) { + for (int i = 0; i < deleteDeltaFiles.length; i++) { + out.writeUTF(deleteDeltaFiles[i]); + } + } } public List<String> getInvalidSegments() { @@ -287,4 +306,8 @@ public class CarbonInputSplit extends FileSplit public Map<String, String> getBlockStorageIdMap() { return blockStorageIdMap; } + + public String[] getDeleteDeltaFiles() { + return deleteDeltaFiles; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java index 7ba6133..f9dc178 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java @@ -90,7 +90,7 @@ class InMemoryBTreeIndex implements Index { result.add(new CarbonInputSplit(segment.getId(), new Path(tableBlockInfo.getFilePath()), tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(), tableBlockInfo.getLocations(), tableBlockInfo.getBlockletInfos().getNoOfBlockLets(), - tableBlockInfo.getVersion())); + tableBlockInfo.getVersion(), null)); } return result; } @@ -142,7 +142,8 @@ class InMemoryBTreeIndex implements Index { tableBlockInfoList.add( new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(), segment.getId(), carbonInputSplit.getLocations(), carbonInputSplit.getLength(), - blockletInfos, carbonInputSplit.getVersion())); + blockletInfos, carbonInputSplit.getVersion(), + carbonInputSplit.getDeleteDeltaFiles())); } return tableBlockInfoList; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 4ebbf60..2898870 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -300,7 +300,8 @@ class CarbonMergerRDD[K, V]( carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter(entry => { val blockInfo = new TableBlockInfo(entry.getPath.toString, entry.getStart, entry.getSegmentId, - entry.getLocations, entry.getLength, entry.getVersion + entry.getLocations, entry.getLength, entry.getVersion, + updateStatusManager.getDeleteDeltaFilePath(entry.getPath.toString) ) !CarbonUtil .isInvalidTableBlock(blockInfo, updateDetails, updateStatusManager) http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 3d2e35b..dfea7d7 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -564,7 +564,7 @@ object CarbonDataRDDFactory { val fileSplit = inputSplit.asInstanceOf[FileSplit] new TableBlockInfo(fileSplit.getPath.toString, fileSplit.getStart, "1", - fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1 + fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1, null ).asInstanceOf[Distributable] } // group blocks to nodes, tasks http://git-wip-us.apache.org/repos/asf/carbondata/blob/da952e82/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index cab78fe..96a8062 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -577,7 +577,7 @@ object CarbonDataRDDFactory { val fileSplit = inputSplit.asInstanceOf[FileSplit] new TableBlockInfo(fileSplit.getPath.toString, fileSplit.getStart, "1", - fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1 + fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1, null ).asInstanceOf[Distributable] } // group blocks to nodes, tasks