http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java deleted file mode 100644 index a0ce24a..0000000 --- a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java +++ /dev/null @@ -1,484 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.metadata; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.io.Serializable; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.filesystem.CarbonFile; -import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; -import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.fileoperations.AtomicFileOperations; -import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; -import org.apache.carbondata.core.fileoperations.FileWriteOperation; -import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; -import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.mutate.CarbonUpdateUtil; -import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; -import org.apache.carbondata.core.statusmanager.SegmentStatus; -import org.apache.carbondata.core.statusmanager.SegmentStatusManager; -import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.DataFileFooterConverter; -import org.apache.carbondata.core.util.path.CarbonStorePath; -import org.apache.carbondata.core.util.path.CarbonTablePath; -import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter; - -import com.google.gson.Gson; - -/** - * Provide read and write support for partition mapping file in each segment - */ -public class PartitionMapFileStore { - - private Map<String, List<String>> partitionMap = new HashMap<>(); - - private boolean partionedSegment = false; - /** - * Write partitionmapp file to the segment folder with indexfilename and corresponding partitions. - * - * @param segmentPath - * @param taskNo - * @param partionNames - * @throws IOException - */ - public void writePartitionMapFile(String segmentPath, final String taskNo, - List<String> partionNames) throws IOException { - CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); - // write partition info to new file. - if (carbonFile.exists() && partionNames.size() > 0) { - CarbonFile[] carbonFiles = carbonFile.listFiles(new CarbonFileFilter() { - @Override public boolean accept(CarbonFile file) { - return file.getName().startsWith(taskNo) && file.getName() - .endsWith(CarbonTablePath.INDEX_FILE_EXT); - } - }); - if (carbonFiles != null && carbonFiles.length > 0) { - PartitionMapper partitionMapper = new PartitionMapper(); - Map<String, List<String>> partitionMap = new HashMap<>(); - partitionMap.put(carbonFiles[0].getName(), partionNames); - partitionMapper.setPartitionMap(partitionMap); - String path = segmentPath + "/" + taskNo + CarbonTablePath.PARTITION_MAP_EXT; - writePartitionFile(partitionMapper, path); - } - } - } - - private void writePartitionFile(PartitionMapper partitionMapper, String path) throws IOException { - AtomicFileOperations fileWrite = - new AtomicFileOperationsImpl(path, FileFactory.getFileType(path)); - BufferedWriter brWriter = null; - DataOutputStream dataOutputStream = null; - Gson gsonObjectToWrite = new Gson(); - try { - dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE); - brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, - Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); - - String metadataInstance = gsonObjectToWrite.toJson(partitionMapper); - brWriter.write(metadataInstance); - } finally { - if (null != brWriter) { - brWriter.flush(); - } - CarbonUtil.closeStreams(brWriter); - fileWrite.close(); - } - } - - /** - * Merge all partition files in a segment to single file. - * - * @param segmentPath - * @throws IOException - */ - public void mergePartitionMapFiles(String segmentPath, String mergeFileName) throws IOException { - CarbonFile[] partitionFiles = getPartitionFiles(segmentPath); - if (partitionFiles != null && partitionFiles.length > 0) { - PartitionMapper partitionMapper = null; - for (CarbonFile file : partitionFiles) { - PartitionMapper localMapper = readPartitionMap(file.getAbsolutePath()); - if (partitionMapper == null && localMapper != null) { - partitionMapper = localMapper; - } - if (localMapper != null) { - partitionMapper = partitionMapper.merge(localMapper); - } - } - if (partitionMapper != null) { - String path = segmentPath + "/" + mergeFileName + CarbonTablePath.PARTITION_MAP_EXT; - writePartitionFile(partitionMapper, path); - for (CarbonFile file : partitionFiles) { - if (!FileFactory.deleteAllCarbonFilesOfDir(file)) { - throw new IOException("Old partition map files cannot be deleted"); - } - } - } - } - } - - private String getPartitionFilePath(String segmentPath) { - CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); - if (carbonFile.exists()) { - CarbonFile[] partitionFiles = carbonFile.listFiles(new CarbonFileFilter() { - @Override public boolean accept(CarbonFile file) { - return file.getName().endsWith(CarbonTablePath.PARTITION_MAP_EXT); - } - }); - if (partitionFiles != null && partitionFiles.length > 0) { - partionedSegment = true; - int i = 0; - // Get the latest partition map file based on the timestamp of that file. - long[] partitionTimestamps = new long[partitionFiles.length]; - for (CarbonFile file : partitionFiles) { - partitionTimestamps[i++] = Long.parseLong(file.getName() - .substring(0, file.getName().length() - CarbonTablePath.PARTITION_MAP_EXT.length())); - } - Arrays.sort(partitionTimestamps); - return segmentPath + "/" + partitionTimestamps[partitionTimestamps.length - 1] - + CarbonTablePath.PARTITION_MAP_EXT; - } - } - return null; - } - - private String getPartitionFilePath(CarbonFile[] carbonFiles, String segmentPath) { - - List<CarbonFile> partitionFiles = new ArrayList<>(); - for (CarbonFile file : carbonFiles) { - if (file.getName().endsWith(CarbonTablePath.PARTITION_MAP_EXT)) { - partitionFiles.add(file); - } - } - if (partitionFiles.size() > 0) { - partionedSegment = true; - int i = 0; - // Get the latest partition map file based on the timestamp of that file. - long[] partitionTimestamps = new long[partitionFiles.size()]; - for (CarbonFile file : partitionFiles) { - partitionTimestamps[i++] = Long.parseLong(file.getName() - .substring(0, file.getName().length() - CarbonTablePath.PARTITION_MAP_EXT.length())); - } - Arrays.sort(partitionTimestamps); - return segmentPath + "/" + partitionTimestamps[partitionTimestamps.length - 1] - + CarbonTablePath.PARTITION_MAP_EXT; - } - return null; - } - - private CarbonFile[] getPartitionFiles(String segmentPath) { - CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); - if (carbonFile.exists()) { - return carbonFile.listFiles(new CarbonFileFilter() { - @Override public boolean accept(CarbonFile file) { - return file.getName().endsWith(CarbonTablePath.PARTITION_MAP_EXT); - } - }); - } - return null; - } - - /** - * This method reads the partition file - * - * @param partitionMapPath - * @return - */ - private PartitionMapper readPartitionMap(String partitionMapPath) throws IOException { - Gson gsonObjectToRead = new Gson(); - DataInputStream dataInputStream = null; - BufferedReader buffReader = null; - InputStreamReader inStream = null; - PartitionMapper partitionMapper; - AtomicFileOperations fileOperation = - new AtomicFileOperationsImpl(partitionMapPath, FileFactory.getFileType(partitionMapPath)); - - try { - if (!FileFactory.isFileExist(partitionMapPath, FileFactory.getFileType(partitionMapPath))) { - return null; - } - dataInputStream = fileOperation.openForRead(); - inStream = new InputStreamReader(dataInputStream, - Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); - buffReader = new BufferedReader(inStream); - partitionMapper = gsonObjectToRead.fromJson(buffReader, PartitionMapper.class); - } finally { - if (inStream != null) { - CarbonUtil.closeStreams(buffReader, inStream, dataInputStream); - } - } - - return partitionMapper; - } - - /** - * Reads all partitions which existed inside the passed segment path - * @param segmentPath - */ - public void readAllPartitionsOfSegment(String segmentPath) throws IOException { - String partitionFilePath = getPartitionFilePath(segmentPath); - if (partitionFilePath != null) { - partionedSegment = true; - PartitionMapper partitionMapper = readPartitionMap(partitionFilePath); - partitionMap.putAll(partitionMapper.getPartitionMap()); - } - } - - /** - * Reads all partitions which existed inside the passed segment path - * @param carbonFiles - */ - public void readAllPartitionsOfSegment(CarbonFile[] carbonFiles, String segmentPath) - throws IOException { - String partitionFilePath = getPartitionFilePath(carbonFiles, segmentPath); - if (partitionFilePath != null) { - partionedSegment = true; - PartitionMapper partitionMapper = readPartitionMap(partitionFilePath); - partitionMap.putAll(partitionMapper.getPartitionMap()); - } - } - - public boolean isPartionedSegment() { - return partionedSegment; - } - - /** - * Drops the partitions from the partition mapper file of the segment and writes to a new file. - * @param segmentPath - * @param partitionsToDrop - * @param uniqueId - * @param partialMatch If it is true then even the partial partition spec matches also can be - * dropped - * @throws IOException - */ - public void dropPartitions(String segmentPath, List<List<String>> partitionsToDrop, - String uniqueId, boolean partialMatch) throws IOException { - readAllPartitionsOfSegment(segmentPath); - List<String> indexesToDrop = new ArrayList<>(); - for (Map.Entry<String, List<String>> entry: partitionMap.entrySet()) { - for (List<String> partitions: partitionsToDrop) { - if (partialMatch) { - if (entry.getValue().containsAll(partitions)) { - indexesToDrop.add(entry.getKey()); - } - } else { - if (partitions.containsAll(entry.getValue())) { - indexesToDrop.add(entry.getKey()); - } - } - } - } - if (indexesToDrop.size() > 0) { - // Remove the indexes from partition map - for (String indexToDrop : indexesToDrop) { - partitionMap.remove(indexToDrop); - } - PartitionMapper mapper = new PartitionMapper(); - mapper.setPartitionMap(partitionMap); - String path = segmentPath + "/" + uniqueId + CarbonTablePath.PARTITION_MAP_EXT + ".tmp"; - writePartitionFile(mapper, path); - } - } - - /** - * It deletes the old partition mapper files in case of success. And in case of failure it removes - * the old new file. - * @param segmentPath - * @param uniqueId - * @param success - */ - public void commitPartitions(String segmentPath, final String uniqueId, boolean success, - String tablePath, List<String> partitionsToDrop) { - CarbonFile carbonFile = FileFactory - .getCarbonFile(segmentPath + "/" + uniqueId + CarbonTablePath.PARTITION_MAP_EXT + ".tmp"); - CarbonFile carbonPartFile = FileFactory - .getCarbonFile(tablePath + "/" + partitionsToDrop.get(0)); - // write partition info to new file. - if (carbonFile.exists()) { - if (success) { - carbonFile.renameForce(segmentPath + "/" + uniqueId + CarbonTablePath.PARTITION_MAP_EXT); - } else { - carbonFile.delete(); - } - } - //Remove the partition directory from table path - carbonPartFile.delete(); - } - - /** - * Clean up invalid data after drop partition in all segments of table - * @param table - * @param currentPartitions Current partitions of table - * @param forceDelete Whether it should be deleted force or check the time for an hour creation - * to delete data. - * @throws IOException - */ - public void cleanSegments( - CarbonTable table, - List<String> currentPartitions, - boolean forceDelete) throws IOException { - SegmentStatusManager ssm = new SegmentStatusManager(table.getAbsoluteTableIdentifier()); - - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(table.getAbsoluteTableIdentifier().getTablePath(), - table.getAbsoluteTableIdentifier().getCarbonTableIdentifier()); - - LoadMetadataDetails[] details = ssm.readLoadMetadata(table.getMetaDataFilepath()); - // scan through each segment. - List<String> segmentsNeedToBeDeleted = new ArrayList<>(); - for (LoadMetadataDetails segment : details) { - - // if this segment is valid then only we will go for deletion of related - // dropped partition files. if the segment is mark for delete or compacted then any way - // it will get deleted. - - if (segment.getSegmentStatus() == SegmentStatus.SUCCESS - || segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS) { - List<String> toBeDeletedIndexFiles = new ArrayList<>(); - List<String> toBeDeletedDataFiles = new ArrayList<>(); - // take the list of files from this segment. - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment.getLoadName()); - String partitionFilePath = getPartitionFilePath(segmentPath); - if (partitionFilePath != null) { - PartitionMapper partitionMapper = readPartitionMap(partitionFilePath); - if (partitionMapper.partitionMap.size() == 0) { - // There is no partition information, it means all partitions are dropped. - // So segment need to be marked as delete. - segmentsNeedToBeDeleted.add(segment.getLoadName()); - continue; - } - DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); - SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); - indexFileStore.readAllIIndexOfSegment(segmentPath); - Set<String> indexFilesFromSegment = indexFileStore.getCarbonIndexMap().keySet(); - for (String indexFile : indexFilesFromSegment) { - // Check the partition information in the partiton mapper - List<String> indexPartitions = partitionMapper.partitionMap.get(indexFile); - if (indexPartitions == null || !currentPartitions.containsAll(indexPartitions)) { - Long fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(indexFile - .substring(indexFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1, - indexFile.length() - CarbonTablePath.INDEX_FILE_EXT.length())); - if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp) || forceDelete) { - toBeDeletedIndexFiles.add(indexFile); - // Add the corresponding carbondata files to the delete list. - byte[] fileData = indexFileStore.getFileData(indexFile); - List<DataFileFooter> indexInfo = - fileFooterConverter.getIndexInfo(segmentPath + "/" + indexFile, fileData); - for (DataFileFooter footer : indexInfo) { - toBeDeletedDataFiles.add(footer.getBlockInfo().getTableBlockInfo().getFilePath()); - } - } - } - } - - if (toBeDeletedIndexFiles.size() > 0) { - indexFilesFromSegment.removeAll(toBeDeletedIndexFiles); - new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentPath, - new ArrayList<String>(indexFilesFromSegment)); - for (String dataFile : toBeDeletedDataFiles) { - FileFactory.deleteFile(dataFile, FileFactory.getFileType(dataFile)); - } - } - CarbonFile[] partitionFiles = getPartitionFiles(segmentPath); - CarbonFile currentPartitionFile = FileFactory.getCarbonFile(partitionFilePath); - if (partitionFiles != null) { - // Delete all old partition files - for (CarbonFile partitionFile : partitionFiles) { - if (!currentPartitionFile.getName().equalsIgnoreCase(partitionFile.getName())) { - long fileTimeStamp = Long.parseLong(partitionFile.getName().substring(0, - partitionFile.getName().length() - CarbonTablePath.PARTITION_MAP_EXT.length())); - if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimeStamp) || forceDelete) { - partitionFile.delete(); - } - } - } - } - partitionMapper = readPartitionMap(partitionFilePath); - if (partitionMapper != null) { - // delete partition map if there is no partition files exist - if (partitionMapper.partitionMap.size() == 0) { - currentPartitionFile.delete(); - } - } - } - } - } - // If any segments that are required to delete - if (segmentsNeedToBeDeleted.size() > 0) { - try { - // Mark the segments as delete. - SegmentStatusManager.updateDeletionStatus( - table.getAbsoluteTableIdentifier(), - segmentsNeedToBeDeleted, - table.getMetaDataFilepath()); - } catch (Exception e) { - throw new IOException(e); - } - } - } - - public List<String> getPartitions(String indexFileName) { - return partitionMap.get(indexFileName); - } - - public Map<String, List<String>> getPartitionMap() { - return partitionMap; - } - - public static class PartitionMapper implements Serializable { - - private static final long serialVersionUID = 3582245668420401089L; - - private Map<String, List<String>> partitionMap; - - public PartitionMapper merge(PartitionMapper mapper) { - if (this == mapper) { - return this; - } - if (partitionMap != null && mapper.partitionMap != null) { - partitionMap.putAll(mapper.partitionMap); - } - if (partitionMap == null) { - partitionMap = mapper.partitionMap; - } - return this; - } - - public Map<String, List<String>> getPartitionMap() { - return partitionMap; - } - - public void setPartitionMap(Map<String, List<String>> partitionMap) { - this.partitionMap = partitionMap; - } - } - -}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java new file mode 100644 index 0000000..b5f5a25 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java @@ -0,0 +1,744 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.metadata; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataFileFooterConverter; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import com.google.gson.Gson; +import org.apache.hadoop.fs.Path; + +/** + * Provide read and write support for segment file associated with each segment + */ +public class SegmentFileStore { + + private SegmentFile segmentFile; + + /** + * Here key folder path and values are index files in it. + */ + private Map<String, List<String>> indexFilesMap; + + private String tablePath; + + public SegmentFileStore(String tablePath, String segmentFileName) throws IOException { + this.tablePath = tablePath; + this.segmentFile = readSegment(tablePath, segmentFileName); + } + + /** + * Write segment information to the segment folder with indexfilename and + * corresponding partitions. + */ + public static void writeSegmentFile(String tablePath, final String taskNo, String location, + String timeStamp, List<String> partionNames) throws IOException { + String tempFolderLoc = timeStamp + ".tmp"; + String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc; + CarbonFile carbonFile = FileFactory.getCarbonFile(writePath); + if (!carbonFile.exists()) { + carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath)); + } + CarbonFile tempFolder = + FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc); + + if (tempFolder.exists() && partionNames.size() > 0) { + CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().startsWith(taskNo) && file.getName() + .endsWith(CarbonTablePath.INDEX_FILE_EXT); + } + }); + if (carbonFiles != null && carbonFiles.length > 0) { + boolean isRelative = false; + if (location.startsWith(tablePath)) { + location = location.substring(tablePath.length(), location.length()); + isRelative = true; + } + SegmentFile segmentFile = new SegmentFile(); + Map<String, FolderDetails> locationMap = new HashMap<>(); + FolderDetails folderDetails = new FolderDetails(); + folderDetails.setRelative(isRelative); + folderDetails.setPartitions(partionNames); + folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); + for (CarbonFile file : carbonFiles) { + folderDetails.getFiles().add(file.getName()); + } + locationMap.put(location, folderDetails); + segmentFile.setLocationMap(locationMap); + String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT; + // write segment info to new file. + writeSegmentFile(segmentFile, path); + } + } + } + + /** + * Writes the segment file in json format + * @param segmentFile + * @param path + * @throws IOException + */ + public static void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException { + AtomicFileOperations fileWrite = + new AtomicFileOperationsImpl(path, FileFactory.getFileType(path)); + BufferedWriter brWriter = null; + DataOutputStream dataOutputStream = null; + Gson gsonObjectToWrite = new Gson(); + try { + dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE); + brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); + + String metadataInstance = gsonObjectToWrite.toJson(segmentFile); + brWriter.write(metadataInstance); + brWriter.flush(); + } finally { + CarbonUtil.closeStreams(brWriter); + fileWrite.close(); + } + } + + /** + * Merge all segment files in a segment to single file. + * + * @throws IOException + */ + public static SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, + String writePath) + throws IOException { + CarbonFile[] segmentFiles = getSegmentFiles(readPath); + if (segmentFiles != null && segmentFiles.length > 0) { + SegmentFile segmentFile = null; + for (CarbonFile file : segmentFiles) { + SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath()); + if (segmentFile == null && localSegmentFile != null) { + segmentFile = localSegmentFile; + } + if (localSegmentFile != null) { + segmentFile = segmentFile.merge(localSegmentFile); + } + } + if (segmentFile != null) { + String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT; + writeSegmentFile(segmentFile, path); + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath)); + } + return segmentFile; + } + return null; + } + + private static CarbonFile[] getSegmentFiles(String segmentPath) { + CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); + if (carbonFile.exists()) { + return carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT); + } + }); + } + return null; + } + + /** + * It provides segment file only for the partitions which has physical index files. + * + * @param partitionSpecs + */ + public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath, + List<PartitionSpec> partitionSpecs) { + SegmentFile segmentFile = null; + for (PartitionSpec spec : partitionSpecs) { + String location = spec.getLocation().toString(); + CarbonFile carbonFile = FileFactory.getCarbonFile(location); + + CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath()); + } + }); + if (listFiles != null && listFiles.length > 0) { + boolean isRelative = false; + if (location.startsWith(tablePath)) { + location = location.substring(tablePath.length(), location.length()); + isRelative = true; + } + SegmentFile localSegmentFile = new SegmentFile(); + Map<String, FolderDetails> locationMap = new HashMap<>(); + FolderDetails folderDetails = new FolderDetails(); + folderDetails.setRelative(isRelative); + folderDetails.setPartitions(spec.getPartitions()); + folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); + for (CarbonFile file : listFiles) { + if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + folderDetails.setMergeFileName(file.getName()); + } else { + folderDetails.getFiles().add(file.getName()); + } + } + locationMap.put(location, folderDetails); + localSegmentFile.setLocationMap(locationMap); + if (segmentFile == null) { + segmentFile = localSegmentFile; + } else { + segmentFile = segmentFile.merge(localSegmentFile); + } + } + } + return segmentFile; + } + + /** + * This method reads the segment file which is written in json format + * + * @param segmentFilePath + * @return + */ + private static SegmentFile readSegmentFile(String segmentFilePath) throws IOException { + Gson gsonObjectToRead = new Gson(); + DataInputStream dataInputStream = null; + BufferedReader buffReader = null; + InputStreamReader inStream = null; + SegmentFile segmentFile; + AtomicFileOperations fileOperation = + new AtomicFileOperationsImpl(segmentFilePath, FileFactory.getFileType(segmentFilePath)); + + try { + if (!FileFactory.isFileExist(segmentFilePath, FileFactory.getFileType(segmentFilePath))) { + return null; + } + dataInputStream = fileOperation.openForRead(); + inStream = new InputStreamReader(dataInputStream, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); + buffReader = new BufferedReader(inStream); + segmentFile = gsonObjectToRead.fromJson(buffReader, SegmentFile.class); + } finally { + if (inStream != null) { + CarbonUtil.closeStreams(buffReader, inStream, dataInputStream); + } + } + + return segmentFile; + } + + /** + * Reads segment file. + */ + private SegmentFile readSegment(String tablePath, String segmentFileName) throws IOException { + String segmentFilePath = + CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + + segmentFileName; + return readSegmentFile(segmentFilePath); + } + + public String getTablePath() { + return tablePath; + } + + /** + * Gets all the index files and related carbondata files from this segment. First user needs to + * call @readIndexFiles method before calling it. + * @return + */ + public Map<String, List<String>> getIndexFilesMap() { + return indexFilesMap; + } + + /** + * Reads all index files which are located in this segment. First user needs to call + * @readSegment method before calling it. + * @throws IOException + */ + public void readIndexFiles() throws IOException { + readIndexFiles(SegmentStatus.SUCCESS, false); + } + + /** + * Reads all index files as per the status of the file. In case of @ignoreStatus is true it just + * reads all index files + * @param status + * @param ignoreStatus + * @throws IOException + */ + private void readIndexFiles(SegmentStatus status, boolean ignoreStatus) throws IOException { + if (indexFilesMap != null) { + return; + } + SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); + indexFilesMap = new HashMap<>(); + indexFileStore.readAllIIndexOfSegment(this, status, ignoreStatus); + Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath(); + DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); + for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) { + List<DataFileFooter> indexInfo = + fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue()); + List<String> blocks = new ArrayList<>(); + for (DataFileFooter footer : indexInfo) { + blocks.add(footer.getBlockInfo().getTableBlockInfo().getFilePath()); + } + indexFilesMap.put(entry.getKey(), blocks); + } + } + + /** + * Gets all index files from this segment + * @return + */ + public Map<String, String> getIndexFiles() { + Map<String, String> indexFiles = new HashMap<>(); + if (segmentFile != null) { + for (Map.Entry<String, FolderDetails> entry : getLocationMap().entrySet()) { + String location = entry.getKey(); + if (entry.getValue().isRelative) { + location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location; + } + if (entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) { + for (String indexFile : entry.getValue().getFiles()) { + indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile, + entry.getValue().mergeFileName); + } + } + } + } + return indexFiles; + } + + /** + * Drops the partition related files from the segment file of the segment and writes + * to a new file. First iterator over segment file and check the path it needs to be dropped. + * And update the status with delete if it found. + * + * @param uniqueId + * @throws IOException + */ + public void dropPartitions(Segment segment, List<PartitionSpec> partitionSpecs, + String uniqueId, List<String> toBeDeletedSegments, List<String> toBeUpdatedSegments) + throws IOException { + readSegment(tablePath, segment.getSegmentFileName()); + boolean updateSegment = false; + for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) { + String location = entry.getKey(); + if (entry.getValue().isRelative) { + location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location; + } + Path path = new Path(location); + // Update the status to delete if path equals + for (PartitionSpec spec : partitionSpecs) { + if (path.equals(spec.getLocation())) { + entry.getValue().setStatus(SegmentStatus.MARKED_FOR_DELETE.getMessage()); + updateSegment = true; + break; + } + } + } + String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath); + writePath = + writePath + CarbonCommonConstants.FILE_SEPARATOR + segment.getSegmentNo() + "_" + uniqueId + + CarbonTablePath.SEGMENT_EXT; + writeSegmentFile(segmentFile, writePath); + // Check whether we can completly remove the segment. + boolean deleteSegment = true; + for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) { + if (entry.getValue().getStatus().equals(SegmentStatus.SUCCESS.getMessage())) { + deleteSegment = false; + break; + } + } + if (deleteSegment) { + toBeDeletedSegments.add(segment.getSegmentNo()); + } + if (updateSegment) { + toBeUpdatedSegments.add(segment.getSegmentNo()); + } + } + + /** + * Update the table status file with the dropped partitions information + * + * @param carbonTable + * @param uniqueId + * @param toBeUpdatedSegments + * @param toBeDeleteSegments + * @throws IOException + */ + public static void commitDropPartitions(CarbonTable carbonTable, String uniqueId, + List<String> toBeUpdatedSegments, List<String> toBeDeleteSegments) throws IOException { + if (toBeDeleteSegments.size() > 0 || toBeUpdatedSegments.size() > 0) { + Set<Segment> segmentSet = new HashSet<>( + new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier()) + .getValidAndInvalidSegments().getValidSegments()); + CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, uniqueId, true, + Segment.toSegmentList(toBeDeleteSegments), Segment.toSegmentList(toBeUpdatedSegments)); + } + } + + /** + * Clean up invalid data after drop partition in all segments of table + * + * @param table + * @param forceDelete Whether it should be deleted force or check the time for an hour creation + * to delete data. + * @throws IOException + */ + public static void cleanSegments(CarbonTable table, List<PartitionSpec> partitionSpecs, + boolean forceDelete) throws IOException { + + LoadMetadataDetails[] details = + SegmentStatusManager.readLoadMetadata(table.getMetaDataFilepath()); + // scan through each segment. + for (LoadMetadataDetails segment : details) { + // if this segment is valid then only we will go for deletion of related + // dropped partition files. if the segment is mark for delete or compacted then any way + // it will get deleted. + + if ((segment.getSegmentStatus() == SegmentStatus.SUCCESS + || segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS) + && segment.getSegmentFile() != null) { + List<String> toBeDeletedIndexFiles = new ArrayList<>(); + List<String> toBeDeletedDataFiles = new ArrayList<>(); + // take the list of files from this segment. + SegmentFileStore fileStore = + new SegmentFileStore(table.getTablePath(), segment.getSegmentFile()); + fileStore.readIndexFiles(SegmentStatus.MARKED_FOR_DELETE, false); + if (forceDelete) { + deletePhysicalPartition(partitionSpecs, fileStore.getIndexFilesMap()); + } + for (Map.Entry<String, List<String>> entry : fileStore.indexFilesMap.entrySet()) { + String indexFile = entry.getKey(); + // Check the partition information in the partiton mapper + Long fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(indexFile + .substring(indexFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1, + indexFile.length() - CarbonTablePath.INDEX_FILE_EXT.length())); + if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp) || forceDelete) { + toBeDeletedIndexFiles.add(indexFile); + // Add the corresponding carbondata files to the delete list. + toBeDeletedDataFiles.addAll(entry.getValue()); + } + } + if (toBeDeletedIndexFiles.size() > 0) { + for (String dataFile : toBeDeletedIndexFiles) { + FileFactory.deleteFile(dataFile, FileFactory.getFileType(dataFile)); + } + for (String dataFile : toBeDeletedDataFiles) { + FileFactory.deleteFile(dataFile, FileFactory.getFileType(dataFile)); + } + } + } + } + } + + /** + * Deletes the segment file and its physical files like partition folders from disk + * @param tablePath + * @param segmentFile + * @param partitionSpecs + * @throws IOException + */ + public static void deleteSegment(String tablePath, String segmentFile, + List<PartitionSpec> partitionSpecs) throws IOException { + SegmentFileStore fileStore = new SegmentFileStore(tablePath, segmentFile); + fileStore.readIndexFiles(SegmentStatus.SUCCESS, true); + Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap(); + for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) { + FileFactory.deleteFile(entry.getKey(), FileFactory.getFileType(entry.getKey())); + for (String file : entry.getValue()) { + FileFactory.deleteFile(file, FileFactory.getFileType(file)); + } + } + deletePhysicalPartition(partitionSpecs, indexFilesMap); + String segmentFilePath = + CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + + segmentFile; + // Deletes the physical segment file + FileFactory.deleteFile(segmentFilePath, FileFactory.getFileType(segmentFilePath)); + } + + private static void deletePhysicalPartition(List<PartitionSpec> partitionSpecs, + Map<String, List<String>> locationMap) { + for (Map.Entry<String, List<String>> entry : locationMap.entrySet()) { + Path location = new Path(entry.getKey()).getParent(); + boolean exists = pathExistsInPartitionSpec(partitionSpecs, location); + if (!exists) { + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString())); + } + } + } + + private static boolean pathExistsInPartitionSpec(List<PartitionSpec> partitionSpecs, + Path partitionPath) { + for (PartitionSpec spec : partitionSpecs) { + if (spec.getLocation().equals(partitionPath)) { + return true; + } + } + return false; + } + + /** + * Get the partition specs of the segment + * @param segmentId + * @param tablePath + * @return + * @throws IOException + */ + public static List<PartitionSpec> getPartitionSpecs(String segmentId, String tablePath) + throws IOException { + LoadMetadataDetails segEntry = null; + LoadMetadataDetails[] details = + SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(tablePath)); + for (LoadMetadataDetails entry : details) { + if (entry.getLoadName().equals(segmentId)) { + segEntry = entry; + break; + } + } + if (segEntry != null && segEntry.getSegmentFile() != null) { + SegmentFileStore fileStore = new SegmentFileStore(tablePath, segEntry.getSegmentFile()); + List<PartitionSpec> partitionSpecs = fileStore.getPartitionSpecs(); + for (PartitionSpec spec : partitionSpecs) { + spec.setUuid(segmentId + "_" + segEntry.getLoadStartTime()); + } + return partitionSpecs; + } + return null; + } + + /** + * Move the loaded data from temp folder to respective partition folder. + * @param segmentFile + * @param tmpFolder + * @param tablePath + */ + public static void moveFromTempFolder(SegmentFile segmentFile, String tmpFolder, + String tablePath) { + + for (Map.Entry<String, SegmentFileStore.FolderDetails> entry : segmentFile.getLocationMap() + .entrySet()) { + String location = entry.getKey(); + if (entry.getValue().isRelative()) { + location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location; + } + CarbonFile oldFolder = + FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tmpFolder); + CarbonFile[] oldFiles = oldFolder.listFiles(); + for (CarbonFile file : oldFiles) { + file.renameForce(location + CarbonCommonConstants.FILE_SEPARATOR + file.getName()); + } + oldFolder.delete(); + } + } + + /** + * Remove temp stage folder in case of job aborted. + * + * @param locationMap + * @param tmpFolder + * @param tablePath + */ + public static void removeTempFolder(Map<String, FolderDetails> locationMap, String tmpFolder, + String tablePath) { + if (locationMap == null) { + return; + } + for (Map.Entry<String, SegmentFileStore.FolderDetails> entry : locationMap.entrySet()) { + String location = entry.getKey(); + if (entry.getValue().isRelative()) { + location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location; + } + CarbonFile oldFolder = + FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tmpFolder); + if (oldFolder.exists()) { + FileFactory.deleteAllCarbonFilesOfDir(oldFolder); + } + } + } + + /** + * Returns content of segment + * @return + */ + public Map<String, FolderDetails> getLocationMap() { + if (segmentFile == null) { + return new HashMap<>(); + } + return segmentFile.getLocationMap(); + } + + /** + * Returs the current partition specs of this segment + * @return + */ + public List<PartitionSpec> getPartitionSpecs() { + List<PartitionSpec> partitionSpecs = new ArrayList<>(); + if (segmentFile != null) { + for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) { + String location = entry.getKey(); + if (entry.getValue().isRelative) { + location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location; + } + if (entry.getValue().getStatus().equals(SegmentStatus.SUCCESS.getMessage())) { + partitionSpecs.add(new PartitionSpec(entry.getValue().partitions, location)); + } + } + } + return partitionSpecs; + } + + /** + * It contains the segment information like location, partitions and related index files + */ + public static class SegmentFile implements Serializable { + + private static final long serialVersionUID = 3582245668420401089L; + + private Map<String, FolderDetails> locationMap; + + public SegmentFile merge(SegmentFile mapper) { + if (this == mapper) { + return this; + } + if (locationMap != null && mapper.locationMap != null) { + for (Map.Entry<String, FolderDetails> entry : mapper.locationMap.entrySet()) { + FolderDetails folderDetails = locationMap.get(entry.getKey()); + if (folderDetails != null) { + folderDetails.merge(entry.getValue()); + } else { + locationMap.put(entry.getKey(), entry.getValue()); + } + } + } + if (locationMap == null) { + locationMap = mapper.locationMap; + } + return this; + } + + public Map<String, FolderDetails> getLocationMap() { + return locationMap; + } + + public void setLocationMap(Map<String, FolderDetails> locationMap) { + this.locationMap = locationMap; + } + } + + /** + * Represents one partition folder + */ + public static class FolderDetails implements Serializable { + + private static final long serialVersionUID = 501021868886928553L; + + private Set<String> files = new HashSet<>(); + + private List<String> partitions = new ArrayList<>(); + + private String status; + + private String mergeFileName; + + private boolean isRelative; + + public FolderDetails merge(FolderDetails folderDetails) { + if (this == folderDetails || folderDetails == null) { + return this; + } + if (folderDetails.files != null) { + files.addAll(folderDetails.files); + } + if (files == null) { + files = folderDetails.files; + } + partitions = folderDetails.partitions; + return this; + } + + public Set<String> getFiles() { + return files; + } + + public void setFiles(Set<String> files) { + this.files = files; + } + + public List<String> getPartitions() { + return partitions; + } + + public void setPartitions(List<String> partitions) { + this.partitions = partitions; + } + + public boolean isRelative() { + return isRelative; + } + + public void setRelative(boolean relative) { + isRelative = relative; + } + + public String getMergeFileName() { + return mergeFileName; + } + + public void setMergeFileName(String mergeFileName) { + this.mergeFileName = mergeFileName; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java index c5f61c2..de98fa8 100644 --- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java @@ -28,6 +28,7 @@ import java.util.Set; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; @@ -82,14 +83,16 @@ public class CarbonUpdateUtil { * @param factPath * @return */ - public static String getTableBlockPath(String tid, String factPath) { - String part = - CarbonTablePath.addPartPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.PART_ID)); + public static String getTableBlockPath(String tid, String factPath, boolean isPartitionTable) { + String partField = getRequiredFieldFromTID(tid, TupleIdEnum.PART_ID); + if (isPartitionTable) { + return factPath + CarbonCommonConstants.FILE_SEPARATOR + partField; + } + String part = CarbonTablePath.addPartPrefix(partField); String segment = CarbonTablePath.addSegmentPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.SEGMENT_ID)); return factPath + CarbonCommonConstants.FILE_SEPARATOR + part + CarbonCommonConstants.FILE_SEPARATOR + segment; - } /** @@ -172,6 +175,22 @@ public class CarbonUpdateUtil { } /** + * Update table status + * @param updatedSegmentsList + * @param table + * @param updatedTimeStamp + * @param isTimestampUpdationRequired + * @param segmentsToBeDeleted + * @return + */ + public static boolean updateTableMetadataStatus(Set<Segment> updatedSegmentsList, + CarbonTable table, String updatedTimeStamp, boolean isTimestampUpdationRequired, + List<Segment> segmentsToBeDeleted) { + return updateTableMetadataStatus(updatedSegmentsList, table, updatedTimeStamp, + isTimestampUpdationRequired, segmentsToBeDeleted, new ArrayList<Segment>()); + } + + /** * * @param updatedSegmentsList * @param table @@ -180,10 +199,9 @@ public class CarbonUpdateUtil { * @param segmentsToBeDeleted * @return */ - public static boolean updateTableMetadataStatus(Set<String> updatedSegmentsList, - CarbonTable table, String updatedTimeStamp, - boolean isTimestampUpdationRequired, - List<String> segmentsToBeDeleted) { + public static boolean updateTableMetadataStatus(Set<Segment> updatedSegmentsList, + CarbonTable table, String updatedTimeStamp, boolean isTimestampUpdationRequired, + List<Segment> segmentsToBeDeleted, List<Segment> segmentFilesTobeUpdated) { boolean status = false; @@ -221,13 +239,13 @@ public class CarbonUpdateUtil { } // if the segments is in the list of marked for delete then update the status. - if (segmentsToBeDeleted.contains(loadMetadata.getLoadName())) { + if (segmentsToBeDeleted.contains(new Segment(loadMetadata.getLoadName(), null))) { loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE); loadMetadata.setModificationOrdeletionTimesStamp(Long.parseLong(updatedTimeStamp)); } } - for (String segName : updatedSegmentsList) { - if (loadMetadata.getLoadName().equalsIgnoreCase(segName)) { + for (Segment segName : updatedSegmentsList) { + if (loadMetadata.getLoadName().equalsIgnoreCase(segName.getSegmentNo())) { // if this call is coming from the delete delta flow then the time stamp // String will come empty then no need to write into table status file. if (isTimestampUpdationRequired) { @@ -240,6 +258,10 @@ public class CarbonUpdateUtil { // update end timestamp for each time. loadMetadata.setUpdateDeltaEndTimestamp(updatedTimeStamp); } + if (segmentFilesTobeUpdated.contains(Segment.toSegment(loadMetadata.getLoadName()))) { + loadMetadata.setSegmentFile(loadMetadata.getLoadName() + "_" + updatedTimeStamp + + CarbonTablePath.SEGMENT_EXT); + } } } } @@ -696,14 +718,14 @@ public class CarbonUpdateUtil { * @param segmentBlockCount * @return */ - public static List<String> getListOfSegmentsToMarkDeleted(Map<String, Long> segmentBlockCount) { - List<String> segmentsToBeDeleted = + public static List<Segment> getListOfSegmentsToMarkDeleted(Map<String, Long> segmentBlockCount) { + List<Segment> segmentsToBeDeleted = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); for (Map.Entry<String, Long> eachSeg : segmentBlockCount.entrySet()) { if (eachSeg.getValue() == 0) { - segmentsToBeDeleted.add(eachSeg.getKey()); + segmentsToBeDeleted.add(new Segment(eachSeg.getKey(), null)); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java index 6490694..cc2e513 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java @@ -74,7 +74,6 @@ import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.core.util.ThreadLocalTaskInfo; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.commons.lang3.ArrayUtils; @@ -269,7 +268,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { blockExecutionInfoList.add(getBlockExecutionInfoForBlock(queryModel, abstractIndex, dataRefNode.getBlockInfos().get(0).getBlockletInfos().getStartBlockletNumber(), dataRefNode.numberOfNodes(), dataRefNode.getBlockInfos().get(0).getFilePath(), - dataRefNode.getBlockInfos().get(0).getDeletedDeltaFilePath())); + dataRefNode.getBlockInfos().get(0).getDeletedDeltaFilePath(), + dataRefNode.getBlockInfos().get(0).getSegmentId())); } if (null != queryModel.getStatisticsRecorder()) { QueryStatistic queryStatistic = new QueryStatistic(); @@ -291,7 +291,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { */ protected BlockExecutionInfo getBlockExecutionInfoForBlock(QueryModel queryModel, AbstractIndex blockIndex, int startBlockletIndex, int numberOfBlockletToScan, String filePath, - String[] deleteDeltaFiles) + String[] deleteDeltaFiles, String segmentId) throws QueryExecutionException { BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo(); SegmentProperties segmentProperties = blockIndex.getSegmentProperties(); @@ -304,11 +304,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { .createDimensionInfoAndGetCurrentBlockQueryDimension(blockExecutionInfo, queryModel.getQueryDimension(), tableBlockDimensions, segmentProperties.getComplexDimensions()); - int tableFactPathLength = CarbonStorePath - .getCarbonTablePath(queryModel.getAbsoluteTableIdentifier().getTablePath(), - queryModel.getAbsoluteTableIdentifier().getCarbonTableIdentifier()).getFactDir() - .length() + 1; - blockExecutionInfo.setBlockId(filePath.substring(tableFactPathLength)); + blockExecutionInfo.setBlockId( + CarbonUtil.getBlockId(queryModel.getAbsoluteTableIdentifier(), filePath, segmentId)); blockExecutionInfo.setDeleteDeltaFilePath(deleteDeltaFiles); blockExecutionInfo.setStartBlockletIndex(startBlockletIndex); blockExecutionInfo.setNumberOfBlockletToScan(numberOfBlockletToScan); @@ -457,6 +454,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { return blockExecutionInfo; } + + /** * This method will be used to get fixed key length size this will be used * to create a row from column chunk http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java index 85602bc..a0fa67d 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java @@ -123,6 +123,11 @@ public class LoadMetadataDetails implements Serializable { */ private FileFormat fileFormat = FileFormat.COLUMNAR_V3; + /** + * Segment file name where it has the information of partition information. + */ + private String segmentFile; + public String getPartitionCount() { return partitionCount; } @@ -417,4 +422,17 @@ public class LoadMetadataDetails implements Serializable { public void setFileFormat(FileFormat fileFormat) { this.fileFormat = fileFormat; } + + public String getSegmentFile() { + return segmentFile; + } + + public void setSegmentFile(String segmentFile) { + this.segmentFile = segmentFile; + } + + @Override public String toString() { + return "LoadMetadataDetails{" + "loadStatus=" + loadStatus + ", loadName='" + loadName + '\'' + + ", loadStartTime='" + loadStartTime + '\'' + ", segmentFile='" + segmentFile + '\'' + '}'; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java index 9d14c62..76c2dc7 100755 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java @@ -33,6 +33,7 @@ import java.util.List; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.fileoperations.AtomicFileOperations; import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; @@ -99,13 +100,14 @@ public class SegmentStatusManager { public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOException { // @TODO: move reading LoadStatus file to separate class - List<String> listOfValidSegments = new ArrayList<>(10); - List<String> listOfValidUpdatedSegments = new ArrayList<>(10); - List<String> listOfInvalidSegments = new ArrayList<>(10); - List<String> listOfStreamSegments = new ArrayList<>(10); + List<Segment> listOfValidSegments = new ArrayList<>(10); + List<Segment> listOfValidUpdatedSegments = new ArrayList<>(10); + List<Segment> listOfInvalidSegments = new ArrayList<>(10); + List<Segment> listOfStreamSegments = new ArrayList<>(10); + List<Segment> listOfInProgressSegments = new ArrayList<>(10); CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier.getTablePath(), - absoluteTableIdentifier.getCarbonTableIdentifier()); + .getCarbonTablePath(absoluteTableIdentifier.getTablePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()); String dataPath = carbonTablePath.getTableStatusFilePath(); DataInputStream dataInputStream = null; @@ -113,7 +115,7 @@ public class SegmentStatusManager { Gson gson = new Gson(); AtomicFileOperations fileOperation = - new AtomicFileOperationsImpl(dataPath, FileFactory.getFileType(dataPath)); + new AtomicFileOperationsImpl(dataPath, FileFactory.getFileType(dataPath)); LoadMetadataDetails[] loadFolderDetailsArray; try { if (FileFactory.isFileExist(dataPath, FileFactory.getFileType(dataPath))) { @@ -127,37 +129,44 @@ public class SegmentStatusManager { } //just directly iterate Array for (LoadMetadataDetails segment : loadFolderDetailsArray) { - if (SegmentStatus.SUCCESS == segment.getSegmentStatus() || - SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus() || - SegmentStatus.LOAD_PARTIAL_SUCCESS == segment.getSegmentStatus() || - SegmentStatus.STREAMING == segment.getSegmentStatus() || - SegmentStatus.STREAMING_FINISH == segment.getSegmentStatus()) { + if (SegmentStatus.SUCCESS == segment.getSegmentStatus() + || SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus() + || SegmentStatus.LOAD_PARTIAL_SUCCESS == segment.getSegmentStatus() + || SegmentStatus.STREAMING == segment.getSegmentStatus() + || SegmentStatus.STREAMING_FINISH == segment.getSegmentStatus()) { // check for merged loads. if (null != segment.getMergedLoadName()) { - if (!listOfValidSegments.contains(segment.getMergedLoadName())) { - listOfValidSegments.add(segment.getMergedLoadName()); + Segment seg = new Segment(segment.getMergedLoadName(), segment.getSegmentFile()); + if (!listOfValidSegments.contains(seg)) { + listOfValidSegments.add(seg); } // if merged load is updated then put it in updated list if (SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus()) { - listOfValidUpdatedSegments.add(segment.getMergedLoadName()); + listOfValidUpdatedSegments.add(seg); } continue; } if (SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus()) { - listOfValidUpdatedSegments.add(segment.getLoadName()); + listOfValidUpdatedSegments + .add(new Segment(segment.getLoadName(), segment.getSegmentFile())); } - if (SegmentStatus.STREAMING == segment.getSegmentStatus() || - SegmentStatus.STREAMING_FINISH == segment.getSegmentStatus()) { - listOfStreamSegments.add(segment.getLoadName()); + if (SegmentStatus.STREAMING == segment.getSegmentStatus() + || SegmentStatus.STREAMING_FINISH == segment.getSegmentStatus()) { + listOfStreamSegments + .add(new Segment(segment.getLoadName(), segment.getSegmentFile())); continue; } - listOfValidSegments.add(segment.getLoadName()); - } else if ((SegmentStatus.LOAD_FAILURE == segment.getSegmentStatus() || - SegmentStatus.COMPACTED == segment.getSegmentStatus() || - SegmentStatus.MARKED_FOR_DELETE == segment.getSegmentStatus())) { - listOfInvalidSegments.add(segment.getLoadName()); + listOfValidSegments.add(new Segment(segment.getLoadName(), segment.getSegmentFile())); + } else if ((SegmentStatus.LOAD_FAILURE == segment.getSegmentStatus() + || SegmentStatus.COMPACTED == segment.getSegmentStatus() + || SegmentStatus.MARKED_FOR_DELETE == segment.getSegmentStatus())) { + listOfInvalidSegments.add(new Segment(segment.getLoadName(), segment.getSegmentFile())); + } else if (SegmentStatus.INSERT_IN_PROGRESS == segment.getSegmentStatus() || + SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == segment.getSegmentStatus()) { + listOfInProgressSegments + .add(new Segment(segment.getLoadName(), segment.getSegmentFile())); } } } @@ -168,7 +177,7 @@ public class SegmentStatusManager { CarbonUtil.closeStreams(dataInputStream); } return new ValidAndInvalidSegmentsInfo(listOfValidSegments, listOfValidUpdatedSegments, - listOfInvalidSegments, listOfStreamSegments); + listOfInvalidSegments, listOfStreamSegments, listOfInProgressSegments); } /** @@ -688,29 +697,35 @@ public class SegmentStatusManager { public static class ValidAndInvalidSegmentsInfo { - private final List<String> listOfValidSegments; - private final List<String> listOfValidUpdatedSegments; - private final List<String> listOfInvalidSegments; - private final List<String> listOfStreamSegments; - - private ValidAndInvalidSegmentsInfo(List<String> listOfValidSegments, - List<String> listOfValidUpdatedSegments, List<String> listOfInvalidUpdatedSegments, - List<String> listOfStreamSegments) { + private final List<Segment> listOfValidSegments; + private final List<Segment> listOfValidUpdatedSegments; + private final List<Segment> listOfInvalidSegments; + private final List<Segment> listOfStreamSegments; + private final List<Segment> listOfInProgressSegments; + + private ValidAndInvalidSegmentsInfo(List<Segment> listOfValidSegments, + List<Segment> listOfValidUpdatedSegments, List<Segment> listOfInvalidUpdatedSegments, + List<Segment> listOfStreamSegments, List<Segment> listOfInProgressSegments) { this.listOfValidSegments = listOfValidSegments; this.listOfValidUpdatedSegments = listOfValidUpdatedSegments; this.listOfInvalidSegments = listOfInvalidUpdatedSegments; this.listOfStreamSegments = listOfStreamSegments; + this.listOfInProgressSegments = listOfInProgressSegments; } - public List<String> getInvalidSegments() { + public List<Segment> getInvalidSegments() { return listOfInvalidSegments; } - public List<String> getValidSegments() { + public List<Segment> getValidSegments() { return listOfValidSegments; } - public List<String> getStreamSegments() { + public List<Segment> getStreamSegments() { return listOfStreamSegments; } + + public List<Segment> getListOfInProgressSegments() { + return listOfInProgressSegments; + } } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java index e0e7b70..71b6ba8 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java @@ -33,6 +33,7 @@ import java.util.Map; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; @@ -70,6 +71,7 @@ public class SegmentUpdateStatusManager { private SegmentUpdateDetails[] updateDetails; private CarbonTablePath carbonTablePath; private Map<String, SegmentUpdateDetails> blockAndDetailsMap; + private boolean isPartitionTable; /** * @param absoluteTableIdentifier @@ -83,6 +85,9 @@ public class SegmentUpdateStatusManager { // on latest file status. segmentDetails = segmentStatusManager.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath()); + if (segmentDetails.length > 0) { + isPartitionTable = segmentDetails[0].getSegmentFile() != null; + } updateDetails = readLoadMetadata(); populateMap(); } @@ -268,12 +273,14 @@ public class SegmentUpdateStatusManager { * @return all delete delta files * @throws Exception */ - public String[] getDeleteDeltaFilePath(String blockFilePath) throws Exception { - int tableFactPathLength = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier.getTablePath(), - absoluteTableIdentifier.getCarbonTableIdentifier()).getFactDir().length() + 1; - String blockame = blockFilePath.substring(tableFactPathLength); - String tupleId = CarbonTablePath.getShortBlockId(blockame); + public String[] getDeleteDeltaFilePath(String blockFilePath, String segmentId) throws Exception { + String blockId = CarbonUtil.getBlockId(absoluteTableIdentifier, blockFilePath, segmentId); + String tupleId; + if (isPartitionTable) { + tupleId = CarbonTablePath.getShortBlockIdForPartitionTable(blockId); + } else { + tupleId = CarbonTablePath.getShortBlockId(blockId); + } return getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT) .toArray(new String[0]); } @@ -292,12 +299,19 @@ public class SegmentUpdateStatusManager { .getCarbonTablePath(absoluteTableIdentifier.getTablePath(), absoluteTableIdentifier.getCarbonTableIdentifier()); String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID); - String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment); String completeBlockName = CarbonTablePath.addDataPartPrefix( CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCK_ID) + CarbonCommonConstants.FACT_FILE_EXT); - String blockPath = - carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName; + String blockPath; + if (isPartitionTable) { + blockPath = absoluteTableIdentifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + + CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.PART_ID) + .replace("#", "/") + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName; + } else { + String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment); + blockPath = + carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName; + } CarbonFile file = FileFactory.getCarbonFile(blockPath, FileFactory.getFileType(blockPath)); if (!file.exists()) { throw new Exception("Invalid tuple id " + tupleId); @@ -306,8 +320,7 @@ public class SegmentUpdateStatusManager { //blockName without timestamp final String blockNameFromTuple = blockNameWithoutExtn.substring(0, blockNameWithoutExtn.lastIndexOf("-")); - return getDeltaFiles(file, blockNameFromTuple, extension, - segment); + return getDeltaFiles(file, blockNameFromTuple, extension, segment); } catch (Exception ex) { String errorMsg = "Invalid tuple id " + tupleId; LOG.error(errorMsg); @@ -418,20 +431,20 @@ public class SegmentUpdateStatusManager { * @param blockName * @return */ - public CarbonFile[] getDeleteDeltaFilesList(final String segmentId, final String blockName) { + public CarbonFile[] getDeleteDeltaFilesList(final Segment segmentId, final String blockName) { CarbonTablePath carbonTablePath = CarbonStorePath .getCarbonTablePath(absoluteTableIdentifier.getTablePath(), absoluteTableIdentifier.getCarbonTableIdentifier()); - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId); + String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId.getSegmentNo()); CarbonFile segDir = FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); for (SegmentUpdateDetails block : updateDetails) { if ((block.getBlockName().equalsIgnoreCase(blockName)) && - (block.getSegmentName().equalsIgnoreCase(segmentId)) + (block.getSegmentName().equalsIgnoreCase(segmentId.getSegmentNo())) && !CarbonUpdateUtil.isBlockInvalid((block.getSegmentStatus()))) { final long deltaStartTimestamp = getStartTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index c208154..eb0a9d7 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -47,6 +47,7 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.FileHolder; import org.apache.carbondata.core.datastore.block.AbstractIndex; import org.apache.carbondata.core.datastore.block.TableBlockInfo; @@ -63,6 +64,7 @@ import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor; import org.apache.carbondata.core.locks.ICarbonLock; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; +import org.apache.carbondata.core.metadata.SegmentFileStore; import org.apache.carbondata.core.metadata.ValueEncoderMeta; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; import org.apache.carbondata.core.metadata.blocklet.SegmentInfo; @@ -1437,14 +1439,13 @@ public final class CarbonUtil { * @param values * @return comma separated segment string */ - public static String convertToString(List<String> values) { + public static String convertToString(List<Segment> values) { if (values == null || values.isEmpty()) { return ""; } StringBuilder segmentStringbuilder = new StringBuilder(); for (int i = 0; i < values.size() - 1; i++) { - String segmentNo = values.get(i); - segmentStringbuilder.append(segmentNo); + segmentStringbuilder.append(values.get(i)); segmentStringbuilder.append(","); } segmentStringbuilder.append(values.get(values.size() - 1)); @@ -2145,7 +2146,8 @@ public final class CarbonUtil { } for (String value : values) { if (!value.equalsIgnoreCase("*")) { - Float aFloatValue = Float.parseFloat(value); + Segment segment = Segment.toSegment(value); + Float aFloatValue = Float.parseFloat(segment.getSegmentNo()); if (aFloatValue < 0 || aFloatValue > Float.MAX_VALUE) { throw new InvalidConfigurationException( "carbon.input.segments.<database_name>.<table_name> value range should be greater " @@ -2298,7 +2300,7 @@ public final class CarbonUtil { } // Get the total size of carbon data and the total size of carbon index - public static HashMap<String, Long> getDataSizeAndIndexSize(CarbonTablePath carbonTablePath, + private static HashMap<String, Long> getDataSizeAndIndexSize(CarbonTablePath carbonTablePath, String segmentId) throws IOException { long carbonDataSize = 0L; long carbonIndexSize = 0L; @@ -2350,6 +2352,51 @@ public final class CarbonUtil { return dataAndIndexSize; } + // Get the total size of carbon data and the total size of carbon index + private static HashMap<String, Long> getDataSizeAndIndexSize(SegmentFileStore fileStore) + throws IOException { + long carbonDataSize = 0L; + long carbonIndexSize = 0L; + HashMap<String, Long> dataAndIndexSize = new HashMap<String, Long>(); + if (fileStore.getLocationMap() != null) { + fileStore.readIndexFiles(); + Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap(); + for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) { + carbonIndexSize += FileFactory.getCarbonFile(entry.getKey()).getSize(); + for (String blockFile : entry.getValue()) { + carbonDataSize += FileFactory.getCarbonFile(blockFile).getSize(); + } + } + } + dataAndIndexSize.put(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE, carbonDataSize); + dataAndIndexSize.put(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE, carbonIndexSize); + return dataAndIndexSize; + } + + // Get the total size of carbon data and the total size of carbon index + public static HashMap<String, Long> getDataSizeAndIndexSize(CarbonTablePath carbonTablePath, + Segment segment) throws IOException { + if (segment.getSegmentFileName() != null) { + SegmentFileStore fileStore = + new SegmentFileStore(carbonTablePath.getPath(), segment.getSegmentFileName()); + return getDataSizeAndIndexSize(fileStore); + } else { + return getDataSizeAndIndexSize(carbonTablePath, segment.getSegmentNo()); + } + } + + // Get the total size of segment. + public static long getSizeOfSegment(CarbonTablePath carbonTablePath, + Segment segment) throws IOException { + HashMap<String, Long> dataSizeAndIndexSize = getDataSizeAndIndexSize(carbonTablePath, segment); + long size = 0; + for (Long eachSize: dataSizeAndIndexSize.values()) { + size += eachSize; + } + return size; + } + + /** * Utility function to check whether table has timseries datamap or not * @param carbonTable @@ -2449,5 +2496,43 @@ public final class CarbonUtil { return updatedMinMaxValues; } + /** + * Generate the blockid as per the block path + * + * @param identifier + * @param filePath + * @param segmentId + * @return + */ + public static String getBlockId(AbsoluteTableIdentifier identifier, String filePath, + String segmentId) { + String blockId; + String blockName = filePath.substring(filePath.lastIndexOf("/") + 1, filePath.length()); + String tablePath = identifier.getTablePath(); + if (filePath.startsWith(tablePath)) { + String factDir = + CarbonStorePath.getCarbonTablePath(tablePath, identifier.getCarbonTableIdentifier()) + .getFactDir(); + if (filePath.startsWith(factDir)) { + blockId = "Part0" + CarbonCommonConstants.FILE_SEPARATOR + "Segment_" + segmentId + + CarbonCommonConstants.FILE_SEPARATOR + blockName; + } else { + // This is the case with partition table. + String partitionDir = + filePath.substring(tablePath.length() + 1, filePath.length() - blockName.length() - 1); + + // Replace / with # on partition director to support multi level partitioning. And access + // them all as a single entity. + blockId = partitionDir.replace("/", "#") + CarbonCommonConstants.FILE_SEPARATOR + "Segment_" + + segmentId + CarbonCommonConstants.FILE_SEPARATOR + blockName; + } + } else { + blockId = filePath.substring(0, filePath.length() - blockName.length()).replace("/", "#") + + CarbonCommonConstants.FILE_SEPARATOR + "Segment_" + segmentId + + CarbonCommonConstants.FILE_SEPARATOR + blockName; + } + return blockId; + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java index d4b328d..f1603dc 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java @@ -18,6 +18,7 @@ package org.apache.carbondata.core.util; import java.io.Serializable; +import java.math.BigDecimal; import org.apache.carbondata.core.constants.CarbonCommonConstants; @@ -26,18 +27,33 @@ public class DataTypeConverterImpl implements DataTypeConverter, Serializable { private static final long serialVersionUID = -1718154403432354200L; public Object convertToDecimal(Object data) { - return new java.math.BigDecimal(data.toString()); + if (null == data) { + return null; + } + if (data instanceof BigDecimal) { + return data; + } + return new BigDecimal(data.toString()); } public Object convertFromByteToUTF8String(Object data) { + if (null == data) { + return null; + } return new String((byte[]) data, CarbonCommonConstants.DEFAULT_CHARSET_CLASS); } public byte[] convertFromStringToByte(Object data) { + if (null == data) { + return null; + } return data.toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS); } public Object convertFromStringToUTF8String(Object data) { + if (null == data) { + return null; + } return data.toString(); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java index c370b14..4602cc4 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java @@ -83,25 +83,7 @@ public final class DataTypeUtil { */ public static Object getMeasureValueBasedOnDataType(String msrValue, DataType dataType, CarbonMeasure carbonMeasure) { - if (dataType == DataTypes.BOOLEAN) { - return BooleanConvert.parseBoolean(msrValue); - } else if (DataTypes.isDecimal(dataType)) { - BigDecimal bigDecimal = - new BigDecimal(msrValue).setScale(carbonMeasure.getScale(), RoundingMode.HALF_UP); - return normalizeDecimalValue(bigDecimal, carbonMeasure.getPrecision()); - } else if (dataType == DataTypes.SHORT) { - return Short.parseShort(msrValue); - } else if (dataType == DataTypes.INT) { - return Integer.parseInt(msrValue); - } else if (dataType == DataTypes.LONG) { - return Long.valueOf(msrValue); - } else { - Double parsedValue = Double.valueOf(msrValue); - if (Double.isInfinite(parsedValue) || Double.isNaN(parsedValue)) { - return null; - } - return parsedValue; - } + return getMeasureValueBasedOnDataType(msrValue, dataType,carbonMeasure, false); } /** @@ -112,15 +94,19 @@ public final class DataTypeUtil { * @param carbonMeasure * @return */ - public static Object getConvertedMeasureValueBasedOnDataType(String msrValue, DataType dataType, - CarbonMeasure carbonMeasure) { + public static Object getMeasureValueBasedOnDataType(String msrValue, DataType dataType, + CarbonMeasure carbonMeasure, boolean useConverter) { if (dataType == DataTypes.BOOLEAN) { return BooleanConvert.parseBoolean(msrValue); } else if (DataTypes.isDecimal(dataType)) { BigDecimal bigDecimal = new BigDecimal(msrValue).setScale(carbonMeasure.getScale(), RoundingMode.HALF_UP); - return converter - .convertToDecimal(normalizeDecimalValue(bigDecimal, carbonMeasure.getPrecision())); + BigDecimal decimal = normalizeDecimalValue(bigDecimal, carbonMeasure.getPrecision()); + if (useConverter) { + return converter.convertToDecimal(decimal); + } else { + return decimal; + } } else if (dataType == DataTypes.SHORT) { return Short.parseShort(msrValue); } else if (dataType == DataTypes.INT) { @@ -815,6 +801,8 @@ public final class DataTypeUtil { */ public static void setDataTypeConverter(DataTypeConverter converterLocal) { converter = converterLocal; + timeStampformatter.remove(); + dateformatter.remove(); } public static DataTypeConverter getDataTypeConverter() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java index 5a63d2f..d70d9ef 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java +++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java @@ -48,7 +48,7 @@ public class CarbonTablePath extends Path { public static final String CARBON_DATA_EXT = ".carbondata"; public static final String INDEX_FILE_EXT = ".carbonindex"; public static final String MERGE_INDEX_FILE_EXT = ".carbonindexmerge"; - public static final String PARTITION_MAP_EXT = ".partitionmap"; + public static final String SEGMENT_EXT = ".segment"; private static final String STREAMING_DIR = ".streaming"; private static final String STREAMING_LOG_DIR = "log"; @@ -111,17 +111,6 @@ public class CarbonTablePath extends Path { } /** - * Return true if the fileNameWithPath ends with partition map file extension name - */ - public static boolean isPartitionMapFile(String fileNameWithPath) { - int pos = fileNameWithPath.lastIndexOf('.'); - if (pos != -1) { - return fileNameWithPath.substring(pos).startsWith(PARTITION_MAP_EXT); - } - return false; - } - - /** * check if it is carbon index file matching extension * * @param fileNameWithPath @@ -667,6 +656,18 @@ public class CarbonTablePath extends Path { } /** + * This method will remove strings in path and return short block id + * + * @param blockId + * @return shortBlockId + */ + public static String getShortBlockIdForPartitionTable(String blockId) { + return blockId.replace(SEGMENT_PREFIX, "") + .replace(DATA_PART_PREFIX, "") + .replace(CARBON_DATA_EXT, ""); + } + + /** * This method will append strings in path and return block id * * @param shortBlockId @@ -735,4 +736,12 @@ public class CarbonTablePath extends Path { public static String getSegmentPath(String tablePath, String segmentId) { return tablePath + "/Fact/Part0/Segment_" + segmentId; } + + /** + * Get the segment file locations of table + */ + public static String getSegmentFilesLocation(String tablePath) { + return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + "segments"; + } + } \ No newline at end of file