Github user manishgupta88 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1984#discussion_r169544627 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java --- @@ -0,0 +1,690 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.metadata; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataFileFooterConverter; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import com.google.gson.Gson; +import org.apache.hadoop.fs.Path; + +/** + * Provide read and write support for segment file associated with each segment + */ +public class SegmentFileStore { + + private SegmentFile segmentFile; + + private Map<String, List<String>> indexFilesMap; + + private String tablePath; + + /** + * Write segment information to the segment folder with indexfilename and + * corresponding partitions. + */ + public void writeSegmentFile(String tablePath, final String taskNo, String location, + String timeStamp, List<String> partionNames) throws IOException { + String tempFolderLoc = timeStamp + ".tmp"; + String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc; + CarbonFile carbonFile = FileFactory.getCarbonFile(writePath); + if (!carbonFile.exists()) { + carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath)); + } + CarbonFile tempFolder = + FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc); + boolean isRelative = false; + if (location.startsWith(tablePath)) { + location = location.substring(tablePath.length(), location.length()); + isRelative = true; + } + if (tempFolder.exists() && partionNames.size() > 0) { + CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().startsWith(taskNo) && file.getName() + .endsWith(CarbonTablePath.INDEX_FILE_EXT); + } + }); + if (carbonFiles != null && carbonFiles.length > 0) { + SegmentFile segmentFile = new SegmentFile(); + Map<String, FolderDetails> locationMap = new HashMap<>(); + FolderDetails folderDetails = new FolderDetails(); + folderDetails.setRelative(isRelative); + folderDetails.setPartitions(partionNames); + folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); + for (CarbonFile file : carbonFiles) { + folderDetails.getFiles().add(file.getName()); + } + locationMap.put(location, folderDetails); + segmentFile.setLocationMap(locationMap); + String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT; + // write segment info to new file. + writeSegmentFile(segmentFile, path); + } + } + } + + /** + * Writes the segment file in json format + * @param segmentFile + * @param path + * @throws IOException + */ + public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException { + AtomicFileOperations fileWrite = + new AtomicFileOperationsImpl(path, FileFactory.getFileType(path)); + BufferedWriter brWriter = null; + DataOutputStream dataOutputStream = null; + Gson gsonObjectToWrite = new Gson(); + try { + dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE); + brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); + + String metadataInstance = gsonObjectToWrite.toJson(segmentFile); + brWriter.write(metadataInstance); + } finally { + if (null != brWriter) { + brWriter.flush(); + } + CarbonUtil.closeStreams(brWriter); + fileWrite.close(); + } + } + + /** + * Merge all segment files in a segment to single file. + * + * @param writePath + * @throws IOException + */ + public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath) + throws IOException { + CarbonFile[] segmentFiles = getSegmentFiles(readPath); + if (segmentFiles != null && segmentFiles.length > 0) { + SegmentFile segmentFile = null; + for (CarbonFile file : segmentFiles) { + SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath()); + if (segmentFile == null && localSegmentFile != null) { + segmentFile = localSegmentFile; + } + if (localSegmentFile != null) { + segmentFile = segmentFile.merge(localSegmentFile); + } + } + if (segmentFile != null) { + String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT; + writeSegmentFile(segmentFile, path); + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath)); + } + return segmentFile; + } + return null; + } + + private CarbonFile[] getSegmentFiles(String segmentPath) { + CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); + if (carbonFile.exists()) { + return carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT); + } + }); + } + return null; + } + + /** + * It provides segment file only for the partitions which has physical index files. + * + * @param partitionSpecs + */ + public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath, + List<PartitionSpec> partitionSpecs) { + SegmentFile segmentFile = null; + for (PartitionSpec spec : partitionSpecs) { + String location = spec.getLocation().toString(); + CarbonFile carbonFile = FileFactory.getCarbonFile(location); + boolean isRelative = false; + if (location.startsWith(tablePath)) { + location = location.substring(tablePath.length(), location.length()); + isRelative = true; + } + CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath()); + } + }); + if (listFiles != null && listFiles.length > 0) { + SegmentFile localSegmentFile = new SegmentFile(); + Map<String, FolderDetails> locationMap = new HashMap<>(); + FolderDetails folderDetails = new FolderDetails(); + folderDetails.setRelative(isRelative); + folderDetails.setPartitions(spec.getPartitions()); + folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); + for (CarbonFile file : listFiles) { + if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + folderDetails.setMergeFileName(file.getName()); + } else { + folderDetails.getFiles().add(file.getName()); + } + } + locationMap.put(location, folderDetails); + localSegmentFile.setLocationMap(locationMap); + if (segmentFile == null) { + segmentFile = localSegmentFile; + } else { + segmentFile = segmentFile.merge(localSegmentFile); + } + } + } + return segmentFile; + } + + /** + * This method reads the segment file which is written in json format + * + * @param segmentFilePath + * @return + */ + private SegmentFile readSegmentFile(String segmentFilePath) throws IOException { + Gson gsonObjectToRead = new Gson(); + DataInputStream dataInputStream = null; + BufferedReader buffReader = null; + InputStreamReader inStream = null; + SegmentFile segmentFile; + AtomicFileOperations fileOperation = + new AtomicFileOperationsImpl(segmentFilePath, FileFactory.getFileType(segmentFilePath)); + + try { + if (!FileFactory.isFileExist(segmentFilePath, FileFactory.getFileType(segmentFilePath))) { + return null; + } + dataInputStream = fileOperation.openForRead(); + inStream = new InputStreamReader(dataInputStream, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); + buffReader = new BufferedReader(inStream); + segmentFile = gsonObjectToRead.fromJson(buffReader, SegmentFile.class); + } finally { + if (inStream != null) { + CarbonUtil.closeStreams(buffReader, inStream, dataInputStream); + } + } + + return segmentFile; + } + + /** + * Reads segment file. + */ + public void readSegment(String tablePath, String segmentFileName) throws IOException { + String segmentFilePath = + CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + + segmentFileName; + SegmentFile segmentFile = readSegmentFile(segmentFilePath); + this.tablePath = tablePath; + this.segmentFile = segmentFile; + } + + public String getTablePath() { + return tablePath; + } + + /** + * Gets all the index files and related carbondata files from this segment. First user needs to + * call @readIndexFiles method before calling it. + * @return + */ + public Map<String, List<String>> getIndexFilesMap() { + return indexFilesMap; + } + + /** + * Reads all index files which are located in this segment. First user needs to call + * @readSegment method before calling it. + * @throws IOException + */ + public void readIndexFiles() throws IOException { + readIndexFiles(SegmentStatus.SUCCESS, false); + } + + /** + * Reads all index files as per the status of the file. In case of @ignoreStatus is true it just + * reads all index files + * @param status + * @param ignoreStatus + * @throws IOException + */ + private void readIndexFiles(SegmentStatus status, boolean ignoreStatus) throws IOException { + if (indexFilesMap != null) { + return; + } + SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); + indexFilesMap = new HashMap<>(); + indexFileStore.readAllIIndexOfSegment(this, status, ignoreStatus); + Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath(); + DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); + for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) { + List<DataFileFooter> indexInfo = + fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue()); + List<String> blocks = new ArrayList<>(); + for (DataFileFooter footer : indexInfo) { + blocks.add(footer.getBlockInfo().getTableBlockInfo().getFilePath()); + } + indexFilesMap.put(entry.getKey(), blocks); + } + } + + /** + * Gets all index files from this segment + * @return + */ + public Map<String, String> getIndexFiles() { + Map<String, String> indexFiles = new HashMap<>(); + if (segmentFile != null) { + for (Map.Entry<String, FolderDetails> entry : getLocationMap().entrySet()) { + String location = entry.getKey(); + if (entry.getValue().isRelative) { + location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location; + } + if (entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) { + for (String indexFile : entry.getValue().getFiles()) { + indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile, + entry.getValue().mergeFileName); + } + } + } + } + return indexFiles; + } + + /** + * Drops the partition related files from the segment file of the segment and writes + * to a new file. First iterator over segment file and check the path it needs to be dropped. + * And update the status with delete if it found. + * + * @param uniqueId + * @throws IOException + */ + public void dropPartitions(String tablePath, Segment segment, List<PartitionSpec> partitionSpecs, + String uniqueId, List<String> toBeDeletedSegments, List<String> toBeUpdatedSegments) + throws IOException { + readSegment(tablePath, segment.getSegmentFileName()); + boolean updateSegment = false; + for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) { + String location = entry.getKey(); + if (entry.getValue().isRelative) { + location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location; + } + Path path = new Path(location); + // Update the status to delete if path equals + for (PartitionSpec spec : partitionSpecs) { + if (path.equals(spec.getLocation())) { + entry.getValue().setStatus(SegmentStatus.MARKED_FOR_DELETE.getMessage()); + updateSegment = true; + break; + } + } + } + String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath); + writePath = + writePath + CarbonCommonConstants.FILE_SEPARATOR + segment.getSegmentId() + "_" + uniqueId + + CarbonTablePath.SEGMENT_EXT; + writeSegmentFile(segmentFile, writePath); + // Check whether we can completly remove the segment. + boolean deleteSegment = true; + for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) { + if (entry.getValue().getStatus().equals(SegmentStatus.SUCCESS.getMessage())) { + deleteSegment = false; --- End diff -- break the loop once deleteSegment is set to false
---