[CARBONDATA-2196] Take CarbonTable from loadmodel during streaming ingestion
This closes #1991 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/65234b27 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/65234b27 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/65234b27 Branch: refs/heads/branch-1.3 Commit: 65234b27df8abd16fa3e7e21c0bf72fa5440aa4e Parents: 6d3105b Author: rahulforallp <rahul.ku...@knoldus.in> Authored: Thu Feb 22 18:29:57 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Sat Mar 3 18:05:04 2018 +0530 ---------------------------------------------------------------------- .../carbondata/core/datamap/TableDataMap.java | 6 ++-- .../hadoop/api/CarbonTableInputFormat.java | 2 +- .../merger/CompactionResultSortProcessor.java | 6 ++-- .../util/CarbonDataProcessorUtil.java | 36 ++++++++++++++++---- 4 files changed, 35 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/65234b27/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java index 6555d6c..020d6c9 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java @@ -176,16 +176,16 @@ public final class TableDataMap extends OperationEventListener { * @return * @throws IOException */ - public List<String> pruneSegments(List<Segment> segments, FilterResolverIntf filterExp) + public List<Segment> pruneSegments(List<Segment> segments, FilterResolverIntf filterExp) throws IOException { - List<String> prunedSegments = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + List<Segment> prunedSegments = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); for (Segment segment : segments) { List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment); for (DataMap dataMap : dataMaps) { if (dataMap.isScanRequired(filterExp)) { // If any one task in a given segment contains the data that means the segment need to // be scanned and we need to validate further data maps in the same segment - prunedSegments.add(segment.getSegmentNo()); + prunedSegments.add(segment); break; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/65234b27/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 96b0b21..3dbf04f 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -910,7 +910,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { /** * return valid segment to access */ - private Segment[] getSegmentsToAccess(JobContext job) { + public Segment[] getSegmentsToAccess(JobContext job) { String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, ""); if (segmentString.trim().isEmpty()) { return new Segment[0]; http://git-wip-us.apache.org/repos/asf/carbondata/blob/65234b27/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index 2fbdf4f..e7c4502 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -430,9 +430,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { */ private void initTempStoreLocation() { tempStoreLocation = CarbonDataProcessorUtil - .getLocalDataFolderLocation(carbonLoadModel.getDatabaseName(), tableName, - carbonLoadModel.getTaskNo(), carbonLoadModel.getPartitionId(), segmentId, - true, false); + .getLocalDataFolderLocation(carbonTable, tableName, carbonLoadModel.getTaskNo(), + carbonLoadModel.getPartitionId(), segmentId, true, false); } - } http://git-wip-us.apache.org/repos/asf/carbondata/blob/65234b27/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index 1e648e1..cfc9fa3 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -117,22 +117,25 @@ public final class CarbonDataProcessorUtil { } } } + /** + * * This method will form the local data folder store location * - * @param databaseName - * @param tableName + * @param carbonTable * @param taskId * @param partitionId * @param segmentId + * @param isCompactionFlow + * @param isAltPartitionFlow * @return */ - public static String[] getLocalDataFolderLocation(String databaseName, String tableName, + public static String[] getLocalDataFolderLocation(CarbonTable carbonTable, String tableName, String taskId, String partitionId, String segmentId, boolean isCompactionFlow, boolean isAltPartitionFlow) { String tempLocationKey = - getTempStoreLocationKey(databaseName, tableName, segmentId, taskId, isCompactionFlow, - isAltPartitionFlow); + getTempStoreLocationKey(carbonTable.getDatabaseName(), tableName, + segmentId, taskId, isCompactionFlow, isAltPartitionFlow); String baseTempStorePath = CarbonProperties.getInstance() .getProperty(tempLocationKey); if (baseTempStorePath == null) { @@ -145,7 +148,6 @@ public final class CarbonDataProcessorUtil { String[] baseTmpStorePathArray = StringUtils.split(baseTempStorePath, File.pathSeparator); String[] localDataFolderLocArray = new String[baseTmpStorePathArray.length]; - CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName); for (int i = 0 ; i < baseTmpStorePathArray.length; i++) { String tmpStore = baseTmpStorePathArray[i]; CarbonTablePath carbonTablePath = @@ -159,6 +161,26 @@ public final class CarbonDataProcessorUtil { } /** + * This method will form the local data folder store location + * + * @param databaseName + * @param tableName + * @param taskId + * @param partitionId + * @param segmentId + * @param isCompactionFlow + * @param isAltPartitionFlow + * @return + */ + public static String[] getLocalDataFolderLocation(String databaseName, String tableName, + String taskId, String partitionId, String segmentId, boolean isCompactionFlow, + boolean isAltPartitionFlow) { + CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName); + return getLocalDataFolderLocation(carbonTable, tableName, taskId, partitionId, + segmentId, isCompactionFlow, isAltPartitionFlow); + } + + /** * This method will form the key for getting the temporary location set in carbon properties * * @param databaseName @@ -587,4 +609,4 @@ public final class CarbonDataProcessorUtil { return isRawDataRequired; } -} \ No newline at end of file +}