http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java deleted file mode 100644 index 14bed0a..0000000 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java +++ /dev/null @@ -1,1371 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.merger; - -import java.io.File; -import java.io.IOException; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Calendar; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.filesystem.CarbonFile; -import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; -import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.locks.CarbonLockFactory; -import org.apache.carbondata.core.locks.ICarbonLock; -import org.apache.carbondata.core.locks.LockUsage; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.mutate.CarbonUpdateUtil; -import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails; -import org.apache.carbondata.core.mutate.SegmentUpdateDetails; -import org.apache.carbondata.core.reader.CarbonDeleteFilesDataReader; -import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; -import org.apache.carbondata.core.statusmanager.SegmentStatusManager; -import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; -import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.core.util.path.CarbonStorePath; -import org.apache.carbondata.core.util.path.CarbonTablePath; -import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl; -import org.apache.carbondata.processing.model.CarbonLoadModel; -import org.apache.carbondata.spark.load.CarbonLoaderUtil; - -/** - * utility class for load merging. - */ -public final class CarbonDataMergerUtil { - private static final LogService LOGGER = - LogServiceFactory.getLogService(CarbonDataMergerUtil.class.getName()); - - private CarbonDataMergerUtil() { - - } - - /** - * Returns the size of all the carbondata files present in the segment. - * @param carbonFile - * @return - */ - private static long getSizeOfFactFileInLoad(CarbonFile carbonFile) { - long factSize = 0; - - // carbon data file case. - CarbonFile[] factFile = carbonFile.listFiles(new CarbonFileFilter() { - - @Override public boolean accept(CarbonFile file) { - return CarbonTablePath.isCarbonDataFile(file.getName()); - } - }); - - for (CarbonFile fact : factFile) { - factSize += fact.getSize(); - } - - return factSize; - } - - /** - * To check whether the merge property is enabled or not. - * - * @return - */ - - public static boolean checkIfAutoLoadMergingRequired() { - // load merge is not supported as per new store format - // moving the load merge check in early to avoid unnecessary load listing causing IOException - // check whether carbons segment merging operation is enabled or not. - // default will be false. - String isLoadMergeEnabled = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, - CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE); - if (isLoadMergeEnabled.equalsIgnoreCase("false")) { - return false; - } - return true; - } - - /** - * Form the Name of the New Merge Folder - * - * @param segmentsToBeMergedList - * @return - */ - public static String getMergedLoadName(List<LoadMetadataDetails> segmentsToBeMergedList) { - String firstSegmentName = segmentsToBeMergedList.get(0).getLoadName(); - if (firstSegmentName.contains(".")) { - String beforeDecimal = firstSegmentName.substring(0, firstSegmentName.indexOf(".")); - String afterDecimal = firstSegmentName.substring(firstSegmentName.indexOf(".") + 1); - int fraction = Integer.parseInt(afterDecimal) + 1; - String mergedSegmentName = beforeDecimal + "." + fraction; - return CarbonCommonConstants.LOAD_FOLDER + mergedSegmentName; - } else { - String mergeName = firstSegmentName + "." + 1; - return CarbonCommonConstants.LOAD_FOLDER + mergeName; - } - - } - - - /** - * Form the Name of the New Merge Folder - * - * @param segmentToBeMerged - * @return - */ - public static String getMergedLoadName(final String segmentToBeMerged) { - String firstSegmentName = segmentToBeMerged; - if (firstSegmentName.contains(".")) { - String beforeDecimal = firstSegmentName.substring(0, firstSegmentName.indexOf(".")); - String afterDecimal = firstSegmentName.substring(firstSegmentName.indexOf(".") + 1); - int fraction = Integer.parseInt(afterDecimal) + 1; - String mergedSegmentName = beforeDecimal + "." + fraction; - return mergedSegmentName; - } else { - String mergeName = firstSegmentName + "." + 1; - return mergeName; - } - - } - - /** - * Update Both Segment Update Status and Table Status for the case of IUD Delete - * delta compaction. - * - * @param loadsToMerge - * @param metaDataFilepath - * @param carbonLoadModel - * @return - */ - public static boolean updateLoadMetadataIUDUpdateDeltaMergeStatus( - List<LoadMetadataDetails> loadsToMerge, String metaDataFilepath, - CarbonLoadModel carbonLoadModel) { - - boolean status = false; - boolean updateLockStatus = false; - boolean tableLockStatus = false; - - String timestamp = carbonLoadModel.getFactTimeStamp(); - - List<String> updatedDeltaFilesList = - new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - - // This routine updateLoadMetadataIUDCompactionMergeStatus is suppose to update - // two files as it is only called during IUD_UPDDEL_DELTA_COMPACTION. Along with - // Table Status Metadata file (For Update Block Compaction) it has to update the - // Table Update Status Metadata File (For corresponding Delete Delta File). - // As the IUD_UPDDEL_DELTA_COMPACTION going to write in the same segment therefore in - // A) Table Update Status Metadata File (Block Level) - // * For each blocks which is being compacted Mark 'Compacted' as the Status. - // B) Table Status Metadata file (Segment Level) - // * loadStatus won't be changed to "compacted' - // * UpdateDeltaStartTime and UpdateDeltaEndTime will be both set to current - // timestamp (which is being passed from driver) - // First the Table Update Status Metadata File should be updated as we need to get - // the updated blocks for the segment from Table Status Metadata Update Delta Start and - // End Timestamp. - - // Table Update Status Metadata Update. - AbsoluteTableIdentifier absoluteTableIdentifier = - carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); - - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), - absoluteTableIdentifier.getCarbonTableIdentifier()); - - SegmentUpdateStatusManager segmentUpdateStatusManager = - new SegmentUpdateStatusManager(absoluteTableIdentifier); - - SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); - - ICarbonLock updateLock = segmentUpdateStatusManager.getTableUpdateStatusLock(); - ICarbonLock statusLock = segmentStatusManager.getTableStatusLock(); - - // Update the Compacted Blocks with Compacted Status. - try { - updatedDeltaFilesList = segmentUpdateStatusManager - .getUpdateDeltaFiles(loadsToMerge.get(0).getLoadName().toString()); - } catch (Exception e) { - LOGGER.error("Error while getting the Update Delta Blocks."); - status = false; - return status; - } - - if (updatedDeltaFilesList.size() > 0) { - try { - updateLockStatus = updateLock.lockWithRetries(); - tableLockStatus = statusLock.lockWithRetries(); - - List<String> blockNames = new ArrayList<>(updatedDeltaFilesList.size()); - - for (String compactedBlocks : updatedDeltaFilesList) { - // Try to BlockName - String fullBlock = compactedBlocks; - int endIndex = fullBlock.lastIndexOf(File.separator); - String blkNoExt = fullBlock.substring(endIndex + 1, fullBlock.lastIndexOf("-")); - blockNames.add(blkNoExt); - } - - if (updateLockStatus && tableLockStatus) { - - SegmentUpdateDetails[] updateLists = segmentUpdateStatusManager - .readLoadMetadata(); - - for (String compactedBlocks : blockNames) { - // Check is the compactedBlocks name matches with oldDetails - for (int i = 0; i < updateLists.length; i++) { - if (updateLists[i].getBlockName().equalsIgnoreCase(compactedBlocks) - && !CarbonCommonConstants.COMPACTED.equalsIgnoreCase(updateLists[i].getStatus()) - && !CarbonCommonConstants.MARKED_FOR_DELETE - .equalsIgnoreCase(updateLists[i].getStatus())) { - updateLists[i].setStatus(CarbonCommonConstants.COMPACTED); - } - } - } - - LoadMetadataDetails[] loadDetails = - segmentStatusManager.readLoadMetadata(metaDataFilepath); - - for (LoadMetadataDetails loadDetail : loadDetails) { - if (loadsToMerge.contains(loadDetail)) { - loadDetail.setUpdateDeltaStartTimestamp(timestamp); - loadDetail.setUpdateDeltaEndTimestamp(timestamp); - if (loadDetail.getLoadName().equalsIgnoreCase("0")) { - loadDetail - .setUpdateStatusFileName(CarbonUpdateUtil.getUpdateStatusFileName(timestamp)); - } - } - } - - try { - segmentUpdateStatusManager - .writeLoadDetailsIntoFile(Arrays.asList(updateLists), timestamp); - segmentStatusManager - .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(), loadDetails); - status = true; - } catch (IOException e) { - LOGGER.error( - "Error while writing metadata. The metadata file path is " + carbonTablePath - .getMetadataDirectoryPath()); - status = false; - } - } else { - LOGGER.error("Not able to acquire the lock."); - status = false; - } - } catch (Exception e) { - LOGGER.error("Error while updating metadata. The metadata file path is " + carbonTablePath - .getMetadataDirectoryPath()); - status = false; - - } finally { - if (updateLockStatus) { - if (updateLock.unlock()) { - LOGGER.info("Unlock the segment update lock successfully."); - } else { - LOGGER.error("Not able to unlock the segment update lock."); - } - } - if (tableLockStatus) { - if (statusLock.unlock()) { - LOGGER.info("Unlock the table status lock successfully."); - } else { - LOGGER.error("Not able to unlock the table status lock."); - } - } - } - } - return status; - } - - /** - * method to update table status in case of IUD Update Delta Compaction. - * @param loadsToMerge - * @param metaDataFilepath - * @param MergedLoadName - * @param carbonLoadModel - * @param compactionType - * @return - */ - public static boolean updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails> loadsToMerge, - String metaDataFilepath, String MergedLoadName, CarbonLoadModel carbonLoadModel, - long mergeLoadStartTime, CompactionType compactionType) { - - boolean tableStatusUpdationStatus = false; - AbsoluteTableIdentifier absoluteTableIdentifier = - carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); - SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); - - ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); - - try { - if (carbonLock.lockWithRetries()) { - LOGGER.info("Acquired lock for the table " + carbonLoadModel.getDatabaseName() + "." - + carbonLoadModel.getTableName() + " for table status updation "); - - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), - absoluteTableIdentifier.getCarbonTableIdentifier()); - - String statusFilePath = carbonTablePath.getTableStatusFilePath(); - - LoadMetadataDetails[] loadDetails = SegmentStatusManager.readLoadMetadata(metaDataFilepath); - - String mergedLoadNumber = MergedLoadName.substring( - MergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) - + CarbonCommonConstants.LOAD_FOLDER.length(), MergedLoadName.length()); - - long modificationOrDeletionTimeStamp = CarbonUpdateUtil.readCurrentTime(); - for (LoadMetadataDetails loadDetail : loadDetails) { - // check if this segment is merged. - if (loadsToMerge.contains(loadDetail)) { - // if the compacted load is deleted after the start of the compaction process, - // then need to discard the compaction process and treat it as failed compaction. - if (loadDetail.getLoadStatus() - .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) { - LOGGER.error("Compaction is aborted as the segment " + loadDetail.getLoadName() - + " is deleted after the compaction is started."); - return false; - } - loadDetail.setLoadStatus(CarbonCommonConstants.COMPACTED); - loadDetail.setModificationOrdeletionTimesStamp(modificationOrDeletionTimeStamp); - loadDetail.setMergedLoadName(mergedLoadNumber); - } - } - - // create entry for merged one. - LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails(); - loadMetadataDetails.setPartitionCount(carbonLoadModel.getPartitionId()); - loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS); - long loadEnddate = CarbonUpdateUtil.readCurrentTime(); - loadMetadataDetails.setLoadEndTime(loadEnddate); - loadMetadataDetails.setLoadName(mergedLoadNumber); - loadMetadataDetails.setLoadStartTime(mergeLoadStartTime); - loadMetadataDetails.setPartitionCount("0"); - // if this is a major compaction then set the segment as major compaction. - if (compactionType == CompactionType.MAJOR_COMPACTION) { - loadMetadataDetails.setMajorCompacted("true"); - } - - List<LoadMetadataDetails> updatedDetailsList = new ArrayList<>(Arrays.asList(loadDetails)); - - // put the merged folder entry - updatedDetailsList.add(loadMetadataDetails); - - try { - SegmentStatusManager.writeLoadDetailsIntoFile(statusFilePath, - updatedDetailsList.toArray(new LoadMetadataDetails[updatedDetailsList.size()])); - tableStatusUpdationStatus = true; - } catch (IOException e) { - LOGGER.error("Error while writing metadata"); - } - } else { - LOGGER.error( - "Could not able to obtain lock for table" + carbonLoadModel.getDatabaseName() + "." - + carbonLoadModel.getTableName() + "for table status updation"); - } - } finally { - if (carbonLock.unlock()) { - LOGGER.info("Table unlocked successfully after table status updation" + carbonLoadModel - .getDatabaseName() + "." + carbonLoadModel.getTableName()); - } else { - LOGGER.error( - "Unable to unlock Table lock for table" + carbonLoadModel.getDatabaseName() + "." - + carbonLoadModel.getTableName() + " during table status updation"); - } - } - return tableStatusUpdationStatus; - } - - /** - * To identify which all segments can be merged. - * - * @param storeLocation - * @param carbonLoadModel - * @param compactionSize - * @return - */ - public static List<LoadMetadataDetails> identifySegmentsToBeMerged(String storeLocation, - CarbonLoadModel carbonLoadModel, long compactionSize, - List<LoadMetadataDetails> segments, CompactionType compactionType) { - - List<LoadMetadataDetails> sortedSegments = new ArrayList<LoadMetadataDetails>(segments); - - sortSegments(sortedSegments); - - // Check for segments which are qualified for IUD compaction. - if (compactionType.equals(CompactionType.IUD_UPDDEL_DELTA_COMPACTION)) { - - List<LoadMetadataDetails> listOfSegmentsToBeMerged = - identifySegmentsToBeMergedBasedOnIUD(sortedSegments, carbonLoadModel); - - return listOfSegmentsToBeMerged; - } - - // check preserve property and preserve the configured number of latest loads. - - List<LoadMetadataDetails> listOfSegmentsAfterPreserve = - checkPreserveSegmentsPropertyReturnRemaining(sortedSegments); - - // filter the segments if the compaction based on days is configured. - - List<LoadMetadataDetails> listOfSegmentsLoadedInSameDateInterval = - identifySegmentsToBeMergedBasedOnLoadedDate(listOfSegmentsAfterPreserve); - List<LoadMetadataDetails> listOfSegmentsToBeMerged; - // identify the segments to merge based on the Size of the segments across partition. - if (compactionType.equals(CompactionType.MAJOR_COMPACTION)) { - - listOfSegmentsToBeMerged = identifySegmentsToBeMergedBasedOnSize(compactionSize, - listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, storeLocation); - } else { - - listOfSegmentsToBeMerged = - identifySegmentsToBeMergedBasedOnSegCount(listOfSegmentsLoadedInSameDateInterval); - } - - return listOfSegmentsToBeMerged; - } - - /** - * Sorting of the segments. - * @param segments - */ - public static void sortSegments(List segments) { - // sort the segment details. - Collections.sort(segments, new Comparator<LoadMetadataDetails>() { - @Override public int compare(LoadMetadataDetails seg1, LoadMetadataDetails seg2) { - double seg1Id = Double.parseDouble(seg1.getLoadName()); - double seg2Id = Double.parseDouble(seg2.getLoadName()); - if (seg1Id - seg2Id < 0) { - return -1; - } - if (seg1Id - seg2Id > 0) { - return 1; - } - return 0; - } - }); - } - - /** - * This method will return the list of loads which are loaded at the same interval. - * This property is configurable. - * - * @param listOfSegmentsBelowThresholdSize - * @return - */ - private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnLoadedDate( - List<LoadMetadataDetails> listOfSegmentsBelowThresholdSize) { - - List<LoadMetadataDetails> loadsOfSameDate = - new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - - long numberOfDaysAllowedToMerge = 0; - try { - numberOfDaysAllowedToMerge = Long.parseLong(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, - CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT)); - if (numberOfDaysAllowedToMerge < 0 || numberOfDaysAllowedToMerge > 100) { - LOGGER.error( - "The specified value for property " + CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT - + " is incorrect." - + " Correct value should be in range of 0 -100. Taking the default value."); - numberOfDaysAllowedToMerge = - Long.parseLong(CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT); - } - - } catch (NumberFormatException e) { - numberOfDaysAllowedToMerge = - Long.parseLong(CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT); - } - // if true then process loads according to the load date. - if (numberOfDaysAllowedToMerge > 0) { - - // filter loads based on the loaded date - boolean first = true; - Date segDate1 = null; - SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP); - for (LoadMetadataDetails segment : listOfSegmentsBelowThresholdSize) { - - if (first) { - segDate1 = initializeFirstSegment(loadsOfSameDate, segment, sdf); - first = false; - continue; - } - long segmentDate = segment.getLoadStartTime(); - Date segDate2 = null; - try { - segDate2 = sdf.parse(sdf.format(segmentDate)); - } catch (ParseException e) { - LOGGER.error("Error while parsing segment start time" + e.getMessage()); - } - - if (isTwoDatesPresentInRequiredRange(segDate1, segDate2, numberOfDaysAllowedToMerge)) { - loadsOfSameDate.add(segment); - } - // if the load is beyond merged date. - // then reset everything and continue search for loads. - else if (loadsOfSameDate.size() < 2) { - loadsOfSameDate.clear(); - // need to add the next segment as first and to check further - segDate1 = initializeFirstSegment(loadsOfSameDate, segment, sdf); - } else { // case where a load is beyond merge date and there is at least 2 loads to merge. - break; - } - } - } else { - return listOfSegmentsBelowThresholdSize; - } - - return loadsOfSameDate; - } - - /** - * @param loadsOfSameDate - * @param segment - * @return - */ - private static Date initializeFirstSegment(List<LoadMetadataDetails> loadsOfSameDate, - LoadMetadataDetails segment, SimpleDateFormat sdf) { - long baselineLoadStartTime = segment.getLoadStartTime(); - Date segDate1 = null; - try { - segDate1 = sdf.parse(sdf.format(baselineLoadStartTime)); - } catch (ParseException e) { - LOGGER.error("Error while parsing segment start time" + e.getMessage()); - } - loadsOfSameDate.add(segment); - return segDate1; - } - - /** - * Method to check if the load dates are complied to the configured dates. - * - * @param segDate1 - * @param segDate2 - * @return - */ - private static boolean isTwoDatesPresentInRequiredRange(Date segDate1, Date segDate2, - long numberOfDaysAllowedToMerge) { - if (segDate1 == null || segDate2 == null) { - return false; - } - // take 1 st date add the configured days . - Calendar cal1 = Calendar.getInstance(); - cal1.set(segDate1.getYear(), segDate1.getMonth(), segDate1.getDate()); - Calendar cal2 = Calendar.getInstance(); - cal2.set(segDate2.getYear(), segDate2.getMonth(), segDate2.getDate()); - - long diff = cal2.getTimeInMillis() - cal1.getTimeInMillis(); - - if ((diff / (24 * 60 * 60 * 1000)) < numberOfDaysAllowedToMerge) { - return true; - } - return false; - } - - /** - * Identify the segments to be merged based on the Size in case of Major compaction. - * - * @param compactionSize - * @param listOfSegmentsAfterPreserve - * @param carbonLoadModel - * @param storeLocation - * @return - */ - private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSize( - long compactionSize, List<LoadMetadataDetails> listOfSegmentsAfterPreserve, - CarbonLoadModel carbonLoadModel, String storeLocation) { - - List<LoadMetadataDetails> segmentsToBeMerged = - new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - - CarbonTableIdentifier tableIdentifier = - carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier(); - - - // total length - long totalLength = 0; - - // check size of each segment , sum it up across partitions - for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) { - - String segId = segment.getLoadName(); - // variable to store one segment size across partition. - long sizeOfOneSegmentAcrossPartition = - getSizeOfSegment(storeLocation, tableIdentifier, segId); - - // if size of a segment is greater than the Major compaction size. then ignore it. - if (sizeOfOneSegmentAcrossPartition > (compactionSize * 1024 * 1024)) { - // if already 2 segments have been found for merging then stop scan here and merge. - if (segmentsToBeMerged.size() > 1) { - break; - } else { // if only one segment is found then remove the earlier one in list. - // reset the total length to 0. - segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - totalLength = 0; - continue; - } - } - - totalLength += sizeOfOneSegmentAcrossPartition; - - // in case of major compaction the size doesnt matter. all the segments will be merged. - if (totalLength < (compactionSize * 1024 * 1024)) { - segmentsToBeMerged.add(segment); - } else { // if already 2 segments have been found for merging then stop scan here and merge. - if (segmentsToBeMerged.size() > 1) { - break; - } else { // if only one segment is found then remove the earlier one in list and put this. - // reset the total length to the current identified segment. - segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - segmentsToBeMerged.add(segment); - totalLength = sizeOfOneSegmentAcrossPartition; - } - } - - } - - return segmentsToBeMerged; - } - - /** - * For calculating the size of the specified segment - * @param storeLocation - * @param tableIdentifier - * @param segId - * @return - */ - private static long getSizeOfSegment(String storeLocation, - CarbonTableIdentifier tableIdentifier, String segId) { - String loadPath = CarbonLoaderUtil - .getStoreLocation(storeLocation, tableIdentifier, segId); - CarbonFile segmentFolder = - FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath)); - return getSizeOfFactFileInLoad(segmentFolder); - } - - /** - * Identify the segments to be merged based on the segment count - * - * @param listOfSegmentsAfterPreserve - * @return - */ - private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSegCount( - List<LoadMetadataDetails> listOfSegmentsAfterPreserve) { - - List<LoadMetadataDetails> mergedSegments = - new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - List<LoadMetadataDetails> unMergedSegments = - new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - - int[] noOfSegmentLevelsCount = - CarbonProperties.getInstance().getCompactionSegmentLevelCount(); - - int level1Size = 0; - int level2Size = 0; - int size = noOfSegmentLevelsCount.length; - - if (size >= 2) { - level1Size = noOfSegmentLevelsCount[0]; - level2Size = noOfSegmentLevelsCount[1]; - } else if (size == 1) { - level1Size = noOfSegmentLevelsCount[0]; - } - - int unMergeCounter = 0; - int mergeCounter = 0; - - // check size of each segment , sum it up across partitions - for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) { - - String segName = segment.getLoadName(); - - // if a segment is already merged 2 levels then it s name will become .2 - // need to exclude those segments from minor compaction. - // if a segment is major compacted then should not be considered for minor. - if (segName.endsWith(CarbonCommonConstants.LEVEL2_COMPACTION_INDEX) || ( - segment.isMajorCompacted() != null && segment.isMajorCompacted() - .equalsIgnoreCase("true"))) { - continue; - } - - // check if the segment is merged or not - - if (!isMergedSegment(segName)) { - //if it is an unmerged segment then increment counter - unMergeCounter++; - unMergedSegments.add(segment); - if (unMergeCounter == (level1Size)) { - return unMergedSegments; - } - } else { - mergeCounter++; - mergedSegments.add(segment); - if (mergeCounter == (level2Size)) { - return mergedSegments; - } - } - } - return new ArrayList<>(0); - } - - /** - * To check if the segment is merged or not. - * @param segName - * @return - */ - private static boolean isMergedSegment(String segName) { - if (segName.contains(".")) { - return true; - } - return false; - } - - /** - * checks number of loads to be preserved and returns remaining valid segments - * - * @param segments - * @return - */ - private static List<LoadMetadataDetails> checkPreserveSegmentsPropertyReturnRemaining( - List<LoadMetadataDetails> segments) { - // check whether the preserving of the segments from merging is enabled or not. - // get the number of loads to be preserved. - int numberOfSegmentsToBePreserved = - CarbonProperties.getInstance().getNumberOfSegmentsToBePreserved(); - // get the number of valid segments and retain the latest loads from merging. - return CarbonDataMergerUtil - .getValidLoadDetailsWithRetaining(segments, numberOfSegmentsToBePreserved); - } - - /** - * Retain the number of segments which are to be preserved and return the remaining list of - * segments. - * - * @param loadMetadataDetails - * @param numberOfSegToBeRetained - * @return - */ - private static List<LoadMetadataDetails> getValidLoadDetailsWithRetaining( - List<LoadMetadataDetails> loadMetadataDetails, int numberOfSegToBeRetained) { - - List<LoadMetadataDetails> validList = - new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - for (LoadMetadataDetails segment : loadMetadataDetails) { - if (isSegmentValid(segment)) { - validList.add(segment); - } - } - - // check if valid list is big enough for removing the number of seg to be retained. - // last element - int removingIndex = validList.size() - 1; - - for (int i = validList.size(); i > 0; i--) { - if (numberOfSegToBeRetained == 0) { - break; - } - // remove last segment - validList.remove(removingIndex--); - numberOfSegToBeRetained--; - } - return validList; - - } - - /** - * This will give the compaction sizes configured based on compaction type. - * - * @param compactionType - * @return - */ - public static long getCompactionSize(CompactionType compactionType) { - - long compactionSize = 0; - switch (compactionType) { - case MAJOR_COMPACTION: - compactionSize = CarbonProperties.getInstance().getMajorCompactionSize(); - break; - default: // this case can not come. - } - return compactionSize; - } - - /** - * For getting the comma separated valid segments for merging. - * - * @param loadMetadataDetails - * @return - */ - public static String getValidSegments(List<LoadMetadataDetails> loadMetadataDetails) { - StringBuilder builder = new StringBuilder(); - for (LoadMetadataDetails segment : loadMetadataDetails) { - //check if this load is an already merged load. - if (null != segment.getMergedLoadName()) { - builder.append(segment.getMergedLoadName()).append(","); - } else { - builder.append(segment.getLoadName()).append(","); - } - } - builder.deleteCharAt(builder.length() - 1); - return builder.toString(); - } - - /** - * This method returns the valid segments attached to the table Identifier. - * - * @param absoluteTableIdentifier - * @return - */ - public static List<String> getValidSegmentList(AbsoluteTableIdentifier absoluteTableIdentifier) - throws IOException { - - SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments = null; - try { - validAndInvalidSegments = - new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments(); - } catch (IOException e) { - LOGGER.error("Error while getting valid segment list for a table identifier"); - throw new IOException(); - } - return validAndInvalidSegments.getValidSegments(); - } - - - /** - * Removing the already merged segments from list. - */ - public static List<LoadMetadataDetails> filterOutNewlyAddedSegments( - List<LoadMetadataDetails> segments, - LoadMetadataDetails lastSeg) { - - // take complete list of segments. - List<LoadMetadataDetails> list = new ArrayList<>(segments); - // sort list - CarbonDataMergerUtil.sortSegments(list); - - // first filter out newly added segments. - return list.subList(0, list.indexOf(lastSeg) + 1); - - } - - /** - * method to identify the segments qualified for merging in case of IUD Compaction. - * - * @param carbonLoadModel - * @param compactionType - * @return - */ - private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnIUD( - List<LoadMetadataDetails> segments, CarbonLoadModel carbonLoadModel) { - - List<LoadMetadataDetails> validSegments = new ArrayList<>(segments.size()); - - AbsoluteTableIdentifier absoluteTableIdentifier = - carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); - - int numberUpdateDeltaFilesThreshold = - CarbonProperties.getInstance().getNoUpdateDeltaFilesThresholdForIUDCompaction(); - for (LoadMetadataDetails seg : segments) { - if ((isSegmentValid(seg)) && checkUpdateDeltaFilesInSeg(seg.getLoadName(), - absoluteTableIdentifier, carbonLoadModel.getSegmentUpdateStatusManager(), - numberUpdateDeltaFilesThreshold)) { - validSegments.add(seg); - } - } - return validSegments; - } - - private static boolean isSegmentValid(LoadMetadataDetails seg) { - return seg.getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS) - || seg.getLoadStatus() - .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) || seg - .getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_UPDATE); - } - - /** - * method gets the segments list which get qualified for IUD compaction. - * @param Segments - * @param absoluteTableIdentifier - * @param compactionTypeIUD - * @return - */ - public static List<String> getSegListIUDCompactionQualified(List<String> Segments, - AbsoluteTableIdentifier absoluteTableIdentifier, - SegmentUpdateStatusManager segmentUpdateStatusManager, CompactionType compactionTypeIUD) { - - List<String> validSegments = new ArrayList<>(); - - if (compactionTypeIUD.equals(CompactionType.IUD_DELETE_DELTA_COMPACTION)) { - int numberDeleteDeltaFilesThreshold = - CarbonProperties.getInstance().getNoDeleteDeltaFilesThresholdForIUDCompaction(); - List<String> deleteSegments = new ArrayList<>(); - for (String seg : Segments) { - if (checkDeleteDeltaFilesInSeg(seg, segmentUpdateStatusManager, - numberDeleteDeltaFilesThreshold)) { - deleteSegments.add(seg); - } - } - if (deleteSegments.size() > 0) { - // This Code Block Append the Segname along with the Blocks selected for Merge instead of - // only taking the segment name. This will help to parallelize better for each block - // in case of Delete Horizontal Compaction. - for (String segName : deleteSegments) { - List<String> tempSegments = getDeleteDeltaFilesInSeg(segName, segmentUpdateStatusManager, - numberDeleteDeltaFilesThreshold); - if (tempSegments != null) { - for (String tempSeg : tempSegments) { - validSegments.add(tempSeg); - } - } - } - } - } else if (compactionTypeIUD.equals(CompactionType.IUD_UPDDEL_DELTA_COMPACTION)) { - int numberUpdateDeltaFilesThreshold = - CarbonProperties.getInstance().getNoUpdateDeltaFilesThresholdForIUDCompaction(); - for (String seg : Segments) { - if (checkUpdateDeltaFilesInSeg(seg, absoluteTableIdentifier, segmentUpdateStatusManager, - numberUpdateDeltaFilesThreshold)) { - validSegments.add(seg); - } - } - } - return validSegments; - } - - /** - * Check if the blockname of the segment belongs to the Valid Update Delta List or not. - * @param seg - * @param blkName - * @param segmentUpdateStatusManager - * @return - */ - public static Boolean checkUpdateDeltaMatchBlock(final String seg, final String blkName, - SegmentUpdateStatusManager segmentUpdateStatusManager) { - - List<String> list = segmentUpdateStatusManager.getUpdateDeltaFiles(seg); - - String fullBlock = blkName; - String[] FileParts = fullBlock.split(CarbonCommonConstants.FILE_SEPARATOR); - String blockName = FileParts[FileParts.length - 1]; - - for (String str : list) { - if (str.contains(blockName)) { - return true; - } - } - return false; - } - - /** - * This method traverses Update Delta Files inside the seg and return true - * if UpdateDelta Files are more than IUD Compaction threshold. - * - * @param seg - * @param absoluteTableIdentifier - * @param segmentUpdateStatusManager - * @param numberDeltaFilesThreshold - * @return - */ - public static Boolean checkUpdateDeltaFilesInSeg(String seg, - AbsoluteTableIdentifier absoluteTableIdentifier, - SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) { - - CarbonFile[] updateDeltaFiles = null; - Set<String> uniqueBlocks = new HashSet<String>(); - - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), - absoluteTableIdentifier.getCarbonTableIdentifier()); - - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", seg); - CarbonFile segDir = - FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); - CarbonFile[] allSegmentFiles = segDir.listFiles(); - - updateDeltaFiles = segmentUpdateStatusManager - .getUpdateDeltaFilesForSegment(seg, true, CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, - false, allSegmentFiles); - - if (updateDeltaFiles == null) { - return false; - } - - // The update Delta files may have Spill over blocks. Will consider multiple spill over - // blocks as one. Currently updateDeltaFiles array contains Update Delta Block name which - // lies within UpdateDelta Start TimeStamp and End TimeStamp. In order to eliminate - // Spill Over Blocks will choose files with unique taskID. - for (CarbonFile blocks : updateDeltaFiles) { - // Get Task ID and the Timestamp from the Block name for e.g. - // part-0-3-1481084721319.carbondata => "3-1481084721319" - String task = CarbonTablePath.DataFileUtil.getTaskNo(blocks.getName()); - String timestamp = - CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(blocks.getName()); - String taskAndTimeStamp = task + "-" + timestamp; - uniqueBlocks.add(taskAndTimeStamp); - } - if (uniqueBlocks.size() > numberDeltaFilesThreshold) { - return true; - } else { - return false; - } - } - - /** - * Check is the segment passed qualifies for IUD delete delta compaction or not i.e. - * if the number of delete delta files present in the segment is more than - * numberDeltaFilesThreshold. - * - * @param seg - * @param segmentUpdateStatusManager - * @param numberDeltaFilesThreshold - * @return - */ - private static boolean checkDeleteDeltaFilesInSeg(String seg, - SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) { - - Set<String> uniqueBlocks = new HashSet<String>(); - List<String> blockNameList = segmentUpdateStatusManager.getBlockNameFromSegment(seg); - - for (final String blockName : blockNameList) { - - CarbonFile[] deleteDeltaFiles = - segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName); - - // The Delete Delta files may have Spill over blocks. Will consider multiple spill over - // blocks as one. Currently DeleteDeltaFiles array contains Delete Delta Block name which - // lies within Delete Delta Start TimeStamp and End TimeStamp. In order to eliminate - // Spill Over Blocks will choose files with unique taskID. - for (CarbonFile blocks : deleteDeltaFiles) { - // Get Task ID and the Timestamp from the Block name for e.g. - // part-0-3-1481084721319.carbondata => "3-1481084721319" - String task = CarbonTablePath.DataFileUtil.getTaskNo(blocks.getName()); - String timestamp = - CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(blocks.getName()); - String taskAndTimeStamp = task + "-" + timestamp; - uniqueBlocks.add(taskAndTimeStamp); - } - - if (uniqueBlocks.size() > numberDeltaFilesThreshold) { - return true; - } - } - return false; - } - - /** - * Check is the segment passed qualifies for IUD delete delta compaction or not i.e. - * if the number of delete delta files present in the segment is more than - * numberDeltaFilesThreshold. - * @param seg - * @param segmentUpdateStatusManager - * @param numberDeltaFilesThreshold - * @return - */ - - private static List<String> getDeleteDeltaFilesInSeg(String seg, - SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) { - - List<String> blockLists = new ArrayList<>(); - List<String> blockNameList = segmentUpdateStatusManager.getBlockNameFromSegment(seg); - - for (final String blockName : blockNameList) { - - CarbonFile[] deleteDeltaFiles = - segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName); - - if (deleteDeltaFiles.length > numberDeltaFilesThreshold) { - blockLists.add(seg + "/" + blockName); - } - } - return blockLists; - } - - /** - * Returns true is horizontal compaction is enabled. - * @return - */ - public static boolean isHorizontalCompactionEnabled() { - if ((CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.isHorizontalCompactionEnabled, - CarbonCommonConstants.defaultIsHorizontalCompactionEnabled)).equalsIgnoreCase("true")) { - return true; - } else { - return false; - } - } - - /** - * method to compact Delete Delta files in case of IUD Compaction. - * - * @param seg - * @param blockName - * @param absoluteTableIdentifier - * @param segmentUpdateDetails - * @param timestamp - * @return - * @throws IOException - */ - public static List<CarbonDataMergerUtilResult> compactBlockDeleteDeltaFiles(String seg, - String blockName, AbsoluteTableIdentifier absoluteTableIdentifier, - SegmentUpdateDetails[] segmentUpdateDetails, Long timestamp) throws IOException { - - SegmentUpdateStatusManager segmentUpdateStatusManager = - new SegmentUpdateStatusManager(absoluteTableIdentifier); - - List<CarbonDataMergerUtilResult> resultList = new ArrayList<CarbonDataMergerUtilResult>(1); - - // set the update status. - segmentUpdateStatusManager.setUpdateStatusDetails(segmentUpdateDetails); - - CarbonFile[] deleteDeltaFiles = - segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName); - - String destFileName = - blockName + "-" + timestamp.toString() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT; - String fullBlockFilePath = deleteDeltaFiles[0].getParentFile().getCanonicalPath() - + CarbonCommonConstants.FILE_SEPARATOR + destFileName; - - List<String> deleteFilePathList = new ArrayList<String>(); - for (CarbonFile cFile : deleteDeltaFiles) { - deleteFilePathList.add(cFile.getCanonicalPath()); - } - - CarbonDataMergerUtilResult blockDetails = new CarbonDataMergerUtilResult(); - blockDetails.setBlockName(blockName); - blockDetails.setSegmentName(seg); - blockDetails.setDeleteDeltaStartTimestamp(timestamp.toString()); - blockDetails.setDeleteDeltaEndTimestamp(timestamp.toString()); - - try { - if (startCompactionDeleteDeltaFiles(deleteFilePathList, blockName, fullBlockFilePath)) { - blockDetails.setCompactionStatus(true); - } else { - blockDetails.setCompactionStatus(false); - } - resultList.add(blockDetails); - } catch (IOException e) { - LOGGER.error("Compaction of Delete Delta Files failed. The complete file path is " - + fullBlockFilePath); - throw new IOException(); - } - return resultList; - } - - /** - * this method compact the delete delta files. - * @param deleteDeltaFiles - * @param blockName - * @param fullBlockFilePath - * @return - */ - public static Boolean startCompactionDeleteDeltaFiles(List<String> deleteDeltaFiles, - String blockName, String fullBlockFilePath) throws IOException { - - DeleteDeltaBlockDetails deleteDeltaBlockDetails = null; - CarbonDeleteFilesDataReader dataReader = new CarbonDeleteFilesDataReader(); - try { - deleteDeltaBlockDetails = - dataReader.getCompactedDeleteDeltaFileFromBlock(deleteDeltaFiles, blockName); - } catch (Exception e) { - String blockFilePath = fullBlockFilePath - .substring(0, fullBlockFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR)); - LOGGER.error("Error while getting the delete delta blocks in path " + blockFilePath); - throw new IOException(); - } - CarbonDeleteDeltaWriterImpl carbonDeleteWriter = - new CarbonDeleteDeltaWriterImpl(fullBlockFilePath, - FileFactory.getFileType(fullBlockFilePath)); - try { - carbonDeleteWriter.write(deleteDeltaBlockDetails); - } catch (IOException e) { - LOGGER.error("Error while writing compacted delete delta file " + fullBlockFilePath); - throw new IOException(); - } - return true; - } - - public static Boolean updateStatusFile( - List<CarbonDataMergerUtilResult> updateDataMergerDetailsList, CarbonTable table, - String timestamp, SegmentUpdateStatusManager segmentUpdateStatusManager) { - - List<SegmentUpdateDetails> segmentUpdateDetails = - new ArrayList<SegmentUpdateDetails>(updateDataMergerDetailsList.size()); - - - // Check the list output. - for (CarbonDataMergerUtilResult carbonDataMergerUtilResult : updateDataMergerDetailsList) { - if (carbonDataMergerUtilResult.getCompactionStatus()) { - SegmentUpdateDetails tempSegmentUpdateDetails = new SegmentUpdateDetails(); - tempSegmentUpdateDetails.setSegmentName(carbonDataMergerUtilResult.getSegmentName()); - tempSegmentUpdateDetails.setBlockName(carbonDataMergerUtilResult.getBlockName()); - - for (SegmentUpdateDetails origDetails : segmentUpdateStatusManager - .getUpdateStatusDetails()) { - if (origDetails.getBlockName().equalsIgnoreCase(carbonDataMergerUtilResult.getBlockName()) - && origDetails.getSegmentName() - .equalsIgnoreCase(carbonDataMergerUtilResult.getSegmentName())) { - - tempSegmentUpdateDetails.setDeletedRowsInBlock(origDetails.getDeletedRowsInBlock()); - tempSegmentUpdateDetails.setStatus(origDetails.getStatus()); - break; - } - } - - tempSegmentUpdateDetails.setDeleteDeltaStartTimestamp( - carbonDataMergerUtilResult.getDeleteDeltaStartTimestamp()); - tempSegmentUpdateDetails - .setDeleteDeltaEndTimestamp(carbonDataMergerUtilResult.getDeleteDeltaEndTimestamp()); - - segmentUpdateDetails.add(tempSegmentUpdateDetails); - } else return false; - } - - CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails, table, timestamp, true); - - // Update the Table Status. - String metaDataFilepath = table.getMetaDataFilepath(); - AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier(); - - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), - absoluteTableIdentifier.getCarbonTableIdentifier()); - - String tableStatusPath = carbonTablePath.getTableStatusFilePath(); - - SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); - - ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); - - boolean lockStatus = false; - - try { - lockStatus = carbonLock.lockWithRetries(); - if (lockStatus) { - LOGGER.info( - "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName() - + " for table status updation"); - - LoadMetadataDetails[] listOfLoadFolderDetailsArray = - segmentStatusManager.readLoadMetadata(metaDataFilepath); - - for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) { - if (loadMetadata.getLoadName().equalsIgnoreCase("0")) { - loadMetadata.setUpdateStatusFileName( - CarbonUpdateUtil.getUpdateStatusFileName(timestamp)); - } - } - try { - segmentStatusManager - .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray); - } catch (IOException e) { - return false; - } - } else { - LOGGER.error("Not able to acquire the lock for Table status updation for table " + table - .getDatabaseName() + "." + table.getFactTableName()); - } - } finally { - if (lockStatus) { - if (carbonLock.unlock()) { - LOGGER.info( - "Table unlocked successfully after table status updation" + table.getDatabaseName() - + "." + table.getFactTableName()); - } else { - LOGGER.error( - "Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table - .getFactTableName() + " during table status updation"); - } - } - } - return true; - } - - /** - * This will update the property of segments as major compacted. - * @param model - * @param changedSegDetails - */ - public static void updateMajorCompactionPropertyInSegment(CarbonLoadModel model, - List<LoadMetadataDetails> changedSegDetails, - List<LoadMetadataDetails> preservedSegment) throws Exception { - - String metadataPath = model.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath(); - AbsoluteTableIdentifier absoluteTableIdentifier = - model.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); - SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); - LoadMetadataDetails[] details = segmentStatusManager.readLoadMetadata(metadataPath); - List<LoadMetadataDetails> originalList = Arrays.asList(details); - for (LoadMetadataDetails segment : changedSegDetails) { - if (preservedSegment.contains(segment)) { - continue; - } - originalList.get(originalList.indexOf(segment)).setMajorCompacted("true"); - - } - - - ICarbonLock carbonTableStatusLock = CarbonLockFactory.getCarbonLockObj( - model.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier(), - LockUsage.TABLE_STATUS_LOCK); - - try { - if (carbonTableStatusLock.lockWithRetries()) { - LOGGER.info( - "Acquired lock for the table " + model.getDatabaseName() + "." + model.getTableName() - + " for table status updation "); - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), - absoluteTableIdentifier.getCarbonTableIdentifier()); - - segmentStatusManager.writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(), - originalList.toArray(new LoadMetadataDetails[originalList.size()])); - } else { - LOGGER.error( - "Could not able to obtain lock for table" + model.getDatabaseName() + "." + model - .getTableName() + "for table status updation"); - throw new Exception("Failed to update the MajorCompactionStatus."); - } - } catch (IOException e) { - LOGGER.error("Error while writing metadata"); - throw new Exception("Failed to update the MajorCompactionStatus." + e.getMessage()); - } finally { - if (carbonTableStatusLock.unlock()) { - LOGGER.info( - "Table unlocked successfully after table status updation" + model.getDatabaseName() - + "." + model.getTableName()); - } else { - LOGGER.error("Unable to unlock Table lock for table" + model.getDatabaseName() + "." + model - .getTableName() + " during table status updation"); - } - } - - } -}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtilResult.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtilResult.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtilResult.java deleted file mode 100644 index 214a231..0000000 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtilResult.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.merger; - -import org.apache.carbondata.core.mutate.SegmentUpdateDetails; - -public final class CarbonDataMergerUtilResult extends SegmentUpdateDetails { - private boolean compactionStatus; - - public boolean getCompactionStatus() { - return compactionStatus; - } - - public void setCompactionStatus(Boolean status) { - compactionStatus = status; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionCallable.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionCallable.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionCallable.java deleted file mode 100644 index d1cb9d9..0000000 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionCallable.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.merger; - -import java.util.concurrent.Callable; - -import org.apache.carbondata.spark.rdd.Compactor; - -import org.apache.spark.sql.execution.command.CompactionCallableModel; - -/** - * Callable class which is used to trigger the compaction in a separate callable. - */ -public class CompactionCallable implements Callable<Void> { - - private final CompactionCallableModel compactionCallableModel; - - public CompactionCallable(CompactionCallableModel compactionCallableModel) { - - this.compactionCallableModel = compactionCallableModel; - } - - @Override public Void call() throws Exception { - - Compactor.triggerCompaction(compactionCallableModel); - return null; - - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionResultSortProcessor.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionResultSortProcessor.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionResultSortProcessor.java deleted file mode 100644 index 6cfe8b5..0000000 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionResultSortProcessor.java +++ /dev/null @@ -1,563 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.carbondata.spark.merger; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.apache.carbondata.common.CarbonIterator; -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.block.SegmentProperties; -import org.apache.carbondata.core.keygenerator.KeyGenException; -import org.apache.carbondata.core.metadata.CarbonMetadata; -import org.apache.carbondata.core.metadata.encoder.Encoding; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; -import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; -import org.apache.carbondata.core.scan.result.BatchResult; -import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; -import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.DataTypeUtil; -import org.apache.carbondata.processing.model.CarbonLoadModel; -import org.apache.carbondata.processing.schema.metadata.SortObserver; -import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException; -import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows; -import org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger; -import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters; -import org.apache.carbondata.processing.store.CarbonDataFileAttributes; -import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar; -import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel; -import org.apache.carbondata.processing.store.SingleThreadFinalSortFilesMerger; -import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException; -import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; - -import org.pentaho.di.core.exception.KettleException; - -/** - * This class will process the query result and convert the data - * into a format compatible for data load - */ -public class CompactionResultSortProcessor { - - /** - * LOGGER - */ - private static final LogService LOGGER = - LogServiceFactory.getLogService(CompactionResultSortProcessor.class.getName()); - /** - * carbon load model that contains all the required information for load - */ - private CarbonLoadModel carbonLoadModel; - /** - * sortDataRows instance for sorting each row read ad writing to sort temp file - */ - private SortDataRows sortDataRows; - /** - * segment proeprties which contains required information for a segment - */ - private SegmentProperties segmentProperties; - /** - * segment information of parent table - */ - private SegmentProperties srcSegmentProperties; - /** - * final merger for merge sort - */ - private SingleThreadFinalSortFilesMerger finalMerger; - /** - * data handler VO object - */ - private CarbonFactDataHandlerColumnar dataHandler; - /** - * column cardinality - */ - private int[] columnCardinality; - /** - * Fact Table To Index Table Column Mapping order - */ - private int[] factToIndexColumnMapping; - /** - * Fact Table Dict Column to Index Table Dict Column Mapping - */ - private int[] factToIndexDictColumnMapping; - /** - * boolean mapping for no dictionary columns in schema - */ - private boolean[] noDictionaryColMapping; - /** - * agg type defined for measures - */ - private char[] aggType; - /** - * segment id - */ - private String segmentId; - /** - * index table name - */ - private String indexTableName; - /** - * temp store location to be sued during data load - */ - private String tempStoreLocation; - /** - * data base name - */ - private String databaseName; - /** - * no dictionary column count in schema - */ - private int noDictionaryCount; - /** - * implicit column count in schema - */ - private int implicitColumnCount; - /** - * total count of measures in schema - */ - private int measureCount; - /** - * dimension count excluding complex dimension and no dictionary column count - */ - private int dimensionColumnCount; - /** - * complex dimension count in schema - */ - private int complexDimensionCount; - /** - * carbon table - */ - private CarbonTable carbonTable; - /** - * whether the allocated tasks has any record - */ - private boolean isRecordFound; - - /** - * @param carbonLoadModel - * @param columnCardinality - * @param segmentId - * @param indexTableName - */ - public CompactionResultSortProcessor(CarbonLoadModel carbonLoadModel, int[] columnCardinality, - String segmentId, String indexTableName, int[] factToIndexColumnMapping, - int[] factToIndexDictColumnMapping) { - this.carbonLoadModel = carbonLoadModel; - this.columnCardinality = columnCardinality; - this.segmentId = segmentId; - this.indexTableName = indexTableName; - this.databaseName = carbonLoadModel.getDatabaseName(); - this.factToIndexColumnMapping = factToIndexColumnMapping; - this.factToIndexDictColumnMapping = factToIndexDictColumnMapping; - initSegmentProperties(); - } - - /** - * This method will iterate over the query result and convert it into a format compatible - * for data loading - * - * @param detailQueryResultIteratorList - */ - public void processQueryResult(List<CarbonIterator<BatchResult>> detailQueryResultIteratorList) - throws Exception { - try { - initTempStoreLocation(); - initSortDataRows(); - processResult(detailQueryResultIteratorList); - // After delete command, if no records are fetched from one split, - // below steps are not required to be initialized. - if (isRecordFound) { - initializeFinalThreadMergerForMergeSort(); - initDataHandler(); - readAndLoadDataFromSortTempFiles(); - } - } finally { - // clear temp files and folders created during secondary index creation - deleteTempStoreLocation(); - } - } - - /** - * This method will clean up the local folders and files created for secondary index creation - */ - private void deleteTempStoreLocation() { - if (null != tempStoreLocation) { - try { - CarbonUtil.deleteFoldersAndFiles(new File[] { new File(tempStoreLocation) }); - } catch (IOException | InterruptedException e) { - LOGGER.error( - "Problem deleting local folders during secondary index creation: " + e.getMessage()); - } - } - } - - /** - * This method will iterate over the query result and perform row sorting operation - * - * @param detailQueryResultIteratorList - */ - private void processResult(List<CarbonIterator<BatchResult>> detailQueryResultIteratorList) - throws Exception { - for (CarbonIterator<BatchResult> detailQueryIterator : detailQueryResultIteratorList) { - while (detailQueryIterator.hasNext()) { - BatchResult batchResult = detailQueryIterator.next(); - while (batchResult.hasNext()) { - addRowForSorting(prepareRowObjectForSorting(batchResult.next())); - isRecordFound = true; - } - } - } - try { - sortDataRows.startSorting(); - } catch (CarbonSortKeyAndGroupByException e) { - LOGGER.error(e); - throw new Exception("Problem loading data while creating secondary index: " + e.getMessage()); - } - } - - /** - * This method will prepare the data from raw object that will take part in sorting - * - * @param row - * @return - */ - private Object[] prepareRowObjectForSorting(Object[] row) { - ByteArrayWrapper wrapper = (ByteArrayWrapper) row[0]; - // ByteBuffer[] noDictionaryBuffer = new ByteBuffer[noDictionaryCount]; - - List<CarbonDimension> dimensions = segmentProperties.getDimensions(); - Object[] preparedRow = new Object[dimensions.size() + measureCount]; - - // convert the dictionary from MDKey to surrogate key - byte[] dictionaryKey = wrapper.getDictionaryKey(); - long[] keyArray = srcSegmentProperties.getDimensionKeyGenerator().getKeyArray(dictionaryKey); - Object[] dictionaryValues = new Object[dimensionColumnCount + measureCount]; - // Re-ordering is required as per index table column dictionary order, - // as output dictionary Byte Array is as per parent table schema order - for (int i = 0; i < keyArray.length; i++) { - dictionaryValues[factToIndexDictColumnMapping[i]] = Long.valueOf(keyArray[i]).intValue(); - } - - int noDictionaryIndex = 0; - int dictionaryIndex = 0; - int i = 0; - // loop excluding last dimension as last one is implicit column. - for (; i < dimensions.size() - 1; i++) { - CarbonDimension dims = dimensions.get(i); - if (dims.hasEncoding(Encoding.DICTIONARY)) { - // dictionary - preparedRow[i] = dictionaryValues[dictionaryIndex++]; - } else { - // no dictionary dims - preparedRow[i] = wrapper.getNoDictionaryKeyByIndex(noDictionaryIndex++); - } - } - - // at last add implicit column position reference(PID) - - preparedRow[i] = wrapper.getImplicitColumnByteArray(); - return preparedRow; - } - - /** - * This method will read sort temp files, perform merge sort and add it to store for data loading - */ - private void readAndLoadDataFromSortTempFiles() throws Exception { - try { - Object[] previousRow = null; - finalMerger.startFinalMerge(); - while (finalMerger.hasNext()) { - Object[] rowRead = finalMerger.next(); - // convert the row from surrogate key to MDKey - // Object[] outputRow = CarbonDataProcessorUtil - // .processNoKettle(rowRead, segmentProperties, aggType, measureCount, noDictionaryCount, - // complexDimensionCount); - Object[] outputRow = null; - dataHandler.addDataToStore(outputRow); - } - dataHandler.finish(); - } catch (CarbonDataWriterException e) { - LOGGER.error(e); - throw new Exception("Problem loading data during compaction: " + e.getMessage()); - } catch (Exception e) { - LOGGER.error(e); - throw new Exception("Problem loading data during compaction: " + e.getMessage()); - } finally { - if (null != dataHandler) { - try { - dataHandler.closeHandler(); - } catch (CarbonDataWriterException e) { - LOGGER.error(e); - throw new Exception("Problem loading data during compaction: " + e.getMessage()); - } - } - } - } - - /** - * This method is used to process the row with out kettle. - * - * @param row input row - * @param segmentProperties - * @param aggType - * @param measureCount - * @param noDictionaryCount - * @param complexDimCount - * @return - * @throws KettleException - */ - public static Object[] processNoKettle(Object[] row, SegmentProperties segmentProperties, - char[] aggType, int measureCount, int noDictionaryCount, int complexDimCount) - throws KettleException { - - // int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex(); - // - // int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex(); - // - // int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex(); - - int measureIndex = 0; - - int noDimByteArrayIndex = 0; - - int dimsArrayIndex = 0; - - Object[] outputRow; - // adding one for the high cardinality dims byte array. - if (noDictionaryCount > 0 || complexDimCount > 0) { - outputRow = new Object[measureCount + 1 + 1]; - } else { - outputRow = new Object[measureCount + 1]; - } - - int l = 0; - int index = 0; - Object[] measures = (Object[]) row[measureIndex]; - for (int i = 0; i < measureCount; i++) { - outputRow[l++] = measures[index++]; - } - outputRow[l] = row[noDimByteArrayIndex]; - - int[] highCardExcludedRows = new int[segmentProperties.getDimColumnsCardinality().length]; - int[] dimsArray = (int[]) row[dimsArrayIndex]; - for (int i = 0; i < highCardExcludedRows.length; i++) { - highCardExcludedRows[i] = dimsArray[i]; - } - try { - outputRow[outputRow.length - 1] = - segmentProperties.getDimensionKeyGenerator().generateKey(highCardExcludedRows); - } catch (KeyGenException e) { - throw new KettleException("unable to generate the mdkey", e); - } - return outputRow; - } - - /** - * initialise segment properties - */ - private void initSegmentProperties() { - CarbonTable carbonTable = CarbonMetadata.getInstance() - .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + indexTableName); - List<ColumnSchema> columnSchemaList = CarbonUtil - .getColumnSchemaList(carbonTable.getDimensionByTableName(indexTableName), - carbonTable.getMeasureByTableName(indexTableName)); - segmentProperties = new SegmentProperties(columnSchemaList, columnCardinality); - srcSegmentProperties = - new SegmentProperties(getParentColumnOrder(columnSchemaList), getParentOrderCardinality()); - } - - /** - * Convert index table column order into parent table column order - */ - private List<ColumnSchema> getParentColumnOrder(List<ColumnSchema> columnSchemaList) { - List<ColumnSchema> parentColumnList = new ArrayList<ColumnSchema>(columnSchemaList.size()); - for (int i = 0; i < columnSchemaList.size(); i++) { - // Extra cols are dummy_measure & positionId implicit column - if (i >= columnCardinality.length) { - parentColumnList.add(columnSchemaList.get(i)); - } else { - parentColumnList.add(columnSchemaList.get(factToIndexColumnMapping[i])); - } - } - return parentColumnList; - } - - /** - * Convert index table column cardinality order into parent table column order - */ - private int[] getParentOrderCardinality() { - int[] parentColumnCardinality = new int[columnCardinality.length]; - for (int i = 0; i < columnCardinality.length; i++) { - parentColumnCardinality[i] = columnCardinality[factToIndexColumnMapping[i]]; - } - return parentColumnCardinality; - } - - /** - * add row to a temp array which will we written to a sort temp file after sorting - * - * @param row - */ - private void addRowForSorting(Object[] row) throws Exception { - try { - // prepare row array using RemoveDictionaryUtil class - sortDataRows.addRow(row); - } catch (CarbonSortKeyAndGroupByException e) { - LOGGER.error(e); - throw new Exception( - "Row addition for sorting failed while creating secondary index: " + e.getMessage()); - } - } - - /** - * create an instance of sort data rows - */ - private void initSortDataRows() throws Exception { - CarbonTable indexTable = CarbonMetadata.getInstance().getCarbonTable( - carbonLoadModel.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + indexTableName); - measureCount = indexTable.getMeasureByTableName(indexTableName).size(); - implicitColumnCount = indexTable.getImplicitDimensionByTableName(indexTableName).size(); - SortObserver observer = new SortObserver(); - List<CarbonDimension> dimensions = indexTable.getDimensionByTableName(indexTableName); - noDictionaryColMapping = new boolean[dimensions.size()]; - int i = 0; - for (CarbonDimension dimension : dimensions) { - if (CarbonUtil.hasEncoding(dimension.getEncoder(), Encoding.DICTIONARY)) { - i++; - continue; - } - noDictionaryColMapping[i++] = true; - noDictionaryCount++; - } - dimensionColumnCount = dimensions.size(); - SortParameters parameters = createSortParameters(); - SortIntermediateFileMerger intermediateFileMerger = new SortIntermediateFileMerger(parameters); - this.sortDataRows = new SortDataRows(parameters, intermediateFileMerger); - try { - this.sortDataRows.initialize(); - } catch (CarbonSortKeyAndGroupByException e) { - LOGGER.error(e); - throw new Exception( - "Error initializing sort data rows object while creating secondary index: " + e - .getMessage()); - } - } - - /** - * This method will create the sort parameters VO object - * - * @return - */ - private SortParameters createSortParameters() { - boolean useKettle = false; - SortParameters parameters = SortParameters - .createSortParameters(databaseName, indexTableName, dimensionColumnCount, - complexDimensionCount, measureCount, noDictionaryCount, - carbonLoadModel.getPartitionId(), segmentId, carbonLoadModel.getTaskNo(), - noDictionaryColMapping); - return parameters; - } - - /** - * create an instance of finalThread merger which will perform merge sort on all the - * sort temp files - */ - private void initializeFinalThreadMergerForMergeSort() { - String sortTempFileLocation = tempStoreLocation + CarbonCommonConstants.FILE_SEPARATOR - + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION; - initAggType(); - // kettle will not be used - boolean useKettle = false; - finalMerger = new SingleThreadFinalSortFilesMerger(sortTempFileLocation, indexTableName, - dimensionColumnCount, complexDimensionCount, measureCount, noDictionaryCount, aggType, - noDictionaryColMapping, useKettle); - } - - /** - * initialise carbon data writer instance - */ - private void initDataHandler() throws Exception { - CarbonFactDataHandlerModel carbonFactDataHandlerModel = getCarbonFactDataHandlerModel(); - carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality()); - CarbonDataFileAttributes carbonDataFileAttributes = - new CarbonDataFileAttributes(Integer.parseInt(carbonLoadModel.getTaskNo()), - carbonLoadModel.getFactTimeStamp()); - carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes); - if (segmentProperties.getNumberOfNoDictionaryDimension() > 0 - || segmentProperties.getComplexDimensions().size() > 0) { - carbonFactDataHandlerModel.setMdKeyIndex(measureCount + 1); - } else { - carbonFactDataHandlerModel.setMdKeyIndex(measureCount); - } - carbonFactDataHandlerModel.setColCardinality(columnCardinality); - carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB()); - // NO-Kettle. - carbonFactDataHandlerModel.setUseKettle(false); - dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel); - try { - dataHandler.initialise(); - } catch (CarbonDataWriterException e) { - LOGGER.error(e); - throw new Exception( - "Problem initialising data handler while creating secondary index: " + e.getMessage()); - } - } - - /** - * This method will create a model object for carbon fact data handler - * - * @return - */ - private CarbonFactDataHandlerModel getCarbonFactDataHandlerModel() { - CarbonFactDataHandlerModel carbonFactDataHandlerModel = null; - // CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonLoaderUtil - // .getCarbonFactDataHandlerModel(carbonLoadModel, segmentProperties, databaseName, - // indexTableName, tempStoreLocation, carbonLoadModel.getStorePath()); - return carbonFactDataHandlerModel; - } - - /** - * initialise temporary store location - */ - private void initTempStoreLocation() { - tempStoreLocation = CarbonDataProcessorUtil - .getLocalDataFolderLocation(databaseName, indexTableName, carbonLoadModel.getTaskNo(), - carbonLoadModel.getPartitionId(), segmentId, false); - } - - /** - * initialise aggregation type for measures for their storage format - */ - private void initAggType() { - aggType = new char[measureCount]; - Arrays.fill(aggType, 'n'); - carbonTable = CarbonMetadata.getInstance() - .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + indexTableName); - List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(indexTableName); - for (int i = 0; i < measureCount; i++) { - aggType[i] = DataTypeUtil.getAggType(measures.get(i).getDataType()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java deleted file mode 100644 index c6b8dda..0000000 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.spark.merger; - -/** - * This enum is used to define the types of Compaction. - * We have 3 types. one is Minor another is Major and - * finally a compaction done after UPDATE-DELETE operation - * called IUD compaction. - */ -public enum CompactionType { - MINOR_COMPACTION, - MAJOR_COMPACTION, - IUD_UPDDEL_DELTA_COMPACTION, - IUD_DELETE_DELTA_COMPACTION, - NONE -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeBlockRelation.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeBlockRelation.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeBlockRelation.java deleted file mode 100644 index 8647526..0000000 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeBlockRelation.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.spark.merger; - -import org.apache.carbondata.core.datastore.block.Distributable; - -/** - * Block to Node mapping - */ -public class NodeBlockRelation implements Comparable<NodeBlockRelation> { - - private final Distributable block; - private final String node; - - public NodeBlockRelation(Distributable block, String node) { - this.block = block; - this.node = node; - - } - - public Distributable getBlock() { - return block; - } - - public String getNode() { - return node; - } - - @Override public int compareTo(NodeBlockRelation obj) { - return this.getNode().compareTo(obj.getNode()); - } - - @Override public boolean equals(Object obj) { - if (!(obj instanceof NodeBlockRelation)) { - return false; - } - NodeBlockRelation o = (NodeBlockRelation) obj; - return node.equals(o.node); - } - - @Override public int hashCode() { - return node.hashCode(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeMultiBlockRelation.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeMultiBlockRelation.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeMultiBlockRelation.java deleted file mode 100644 index 1bf69af..0000000 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeMultiBlockRelation.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.spark.merger; - -import java.util.List; - -import org.apache.carbondata.core.datastore.block.Distributable; - -public class NodeMultiBlockRelation implements Comparable<NodeMultiBlockRelation> { - - private final List<Distributable> blocks; - private final String node; - - public NodeMultiBlockRelation(String node, List<Distributable> blocks) { - this.node = node; - this.blocks = blocks; - - } - - public List<Distributable> getBlocks() { - return blocks; - } - - public String getNode() { - return node; - } - - @Override public int compareTo(NodeMultiBlockRelation obj) { - return this.blocks.size() - obj.getBlocks().size(); - } - - @Override public boolean equals(Object obj) { - if (!(obj instanceof NodeMultiBlockRelation)) { - return false; - } - NodeMultiBlockRelation o = (NodeMultiBlockRelation) obj; - return blocks.equals(o.blocks) && node.equals(o.node); - } - - @Override public int hashCode() { - return blocks.hashCode() + node.hashCode(); - } -}