Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1984#discussion_r169629127
  
    --- Diff: 
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
    @@ -0,0 +1,690 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.core.metadata;
    +
    +import java.io.BufferedReader;
    +import java.io.BufferedWriter;
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.io.OutputStreamWriter;
    +import java.io.Serializable;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
    +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
    +import org.apache.carbondata.core.fileoperations.FileWriteOperation;
    +import org.apache.carbondata.core.indexstore.PartitionSpec;
    +import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
    +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
    +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
    +import org.apache.carbondata.core.statusmanager.SegmentStatus;
    +import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.DataFileFooterConverter;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +
    +import com.google.gson.Gson;
    +import org.apache.hadoop.fs.Path;
    +
    +/**
    + * Provide read and write support for segment file associated with each 
segment
    + */
    +public class SegmentFileStore {
    +
    +  private SegmentFile segmentFile;
    +
    +  private Map<String, List<String>> indexFilesMap;
    +
    +  private String tablePath;
    +
    +  /**
    +   * Write segment information to the segment folder with indexfilename and
    +   * corresponding partitions.
    +   */
    +  public void writeSegmentFile(String tablePath, final String taskNo, 
String location,
    +      String timeStamp, List<String> partionNames) throws IOException {
    +    String tempFolderLoc = timeStamp + ".tmp";
    +    String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) 
+ "/" + tempFolderLoc;
    +    CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
    +    if (!carbonFile.exists()) {
    +      carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
    +    }
    +    CarbonFile tempFolder =
    +        FileFactory.getCarbonFile(location + 
CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
    +    boolean isRelative = false;
    +    if (location.startsWith(tablePath)) {
    +      location = location.substring(tablePath.length(), location.length());
    +      isRelative = true;
    +    }
    +    if (tempFolder.exists() && partionNames.size() > 0) {
    +      CarbonFile[] carbonFiles = tempFolder.listFiles(new 
CarbonFileFilter() {
    +        @Override public boolean accept(CarbonFile file) {
    +          return file.getName().startsWith(taskNo) && file.getName()
    +              .endsWith(CarbonTablePath.INDEX_FILE_EXT);
    +        }
    +      });
    +      if (carbonFiles != null && carbonFiles.length > 0) {
    +        SegmentFile segmentFile = new SegmentFile();
    +        Map<String, FolderDetails> locationMap = new HashMap<>();
    +        FolderDetails folderDetails = new FolderDetails();
    +        folderDetails.setRelative(isRelative);
    +        folderDetails.setPartitions(partionNames);
    +        folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
    +        for (CarbonFile file : carbonFiles) {
    +          folderDetails.getFiles().add(file.getName());
    +        }
    +        locationMap.put(location, folderDetails);
    +        segmentFile.setLocationMap(locationMap);
    +        String path = writePath + "/" + taskNo + 
CarbonTablePath.SEGMENT_EXT;
    +        // write segment info to new file.
    +        writeSegmentFile(segmentFile, path);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Writes the segment file in json format
    +   * @param segmentFile
    +   * @param path
    +   * @throws IOException
    +   */
    +  public void writeSegmentFile(SegmentFile segmentFile, String path) 
throws IOException {
    +    AtomicFileOperations fileWrite =
    +        new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
    +    BufferedWriter brWriter = null;
    +    DataOutputStream dataOutputStream = null;
    +    Gson gsonObjectToWrite = new Gson();
    +    try {
    +      dataOutputStream = 
fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
    +      brWriter = new BufferedWriter(new 
OutputStreamWriter(dataOutputStream,
    +          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
    +
    +      String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
    +      brWriter.write(metadataInstance);
    +    } finally {
    +      if (null != brWriter) {
    +        brWriter.flush();
    +      }
    +      CarbonUtil.closeStreams(brWriter);
    +      fileWrite.close();
    +    }
    +  }
    +
    +  /**
    +   * Merge all segment files in a segment to single file.
    +   *
    +   * @param writePath
    +   * @throws IOException
    +   */
    +  public SegmentFile mergeSegmentFiles(String readPath, String 
mergeFileName, String writePath)
    +      throws IOException {
    +    CarbonFile[] segmentFiles = getSegmentFiles(readPath);
    +    if (segmentFiles != null && segmentFiles.length > 0) {
    +      SegmentFile segmentFile = null;
    +      for (CarbonFile file : segmentFiles) {
    +        SegmentFile localSegmentFile = 
readSegmentFile(file.getAbsolutePath());
    +        if (segmentFile == null && localSegmentFile != null) {
    +          segmentFile = localSegmentFile;
    +        }
    +        if (localSegmentFile != null) {
    +          segmentFile = segmentFile.merge(localSegmentFile);
    +        }
    +      }
    +      if (segmentFile != null) {
    +        String path = writePath + "/" + mergeFileName + 
CarbonTablePath.SEGMENT_EXT;
    +        writeSegmentFile(segmentFile, path);
    +        
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
    +      }
    +      return segmentFile;
    +    }
    +    return null;
    +  }
    +
    +  private CarbonFile[] getSegmentFiles(String segmentPath) {
    +    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
    +    if (carbonFile.exists()) {
    +      return carbonFile.listFiles(new CarbonFileFilter() {
    +        @Override public boolean accept(CarbonFile file) {
    +          return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT);
    +        }
    +      });
    +    }
    +    return null;
    +  }
    +
    +  /**
    +   * It provides segment file only for the partitions which has physical 
index files.
    +   *
    +   * @param partitionSpecs
    +   */
    +  public static SegmentFile getSegmentFileForPhysicalDataPartitions(String 
tablePath,
    +      List<PartitionSpec> partitionSpecs) {
    +    SegmentFile segmentFile = null;
    +    for (PartitionSpec spec : partitionSpecs) {
    +      String location = spec.getLocation().toString();
    +      CarbonFile carbonFile = FileFactory.getCarbonFile(location);
    +      boolean isRelative = false;
    +      if (location.startsWith(tablePath)) {
    --- End diff --
    
    ok


---

Reply via email to