http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorder.java
 
b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorder.java
new file mode 100644
index 0000000..fd9c0ef
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorder.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.stats;
+
+/**
+ * interface will be used to record and log the query statistics
+ */
+public interface QueryStatisticsRecorder {
+
+  void recordStatistics(QueryStatistic statistic);
+
+  void logStatistics();
+
+  void logStatisticsAsTableExecutor();
+
+  void recordStatisticsForDriver(QueryStatistic statistic, String queryId);
+
+  void logStatisticsAsTableDriver();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderDummy.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderDummy.java
 
b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderDummy.java
new file mode 100644
index 0000000..a751936
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderDummy.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.stats;
+
+import java.io.Serializable;
+
+/**
+ * Class will be used to record and log the query statistics
+ */
+public class QueryStatisticsRecorderDummy implements 
QueryStatisticsRecorder,Serializable {
+
+  /**
+   * serialization version
+   */
+  private static final long serialVersionUID = -5719752001674467864L;
+
+  public QueryStatisticsRecorderDummy() {
+
+  }
+
+  /**
+   * Below method will be used to add the statistics
+   *
+   * @param statistic
+   */
+  public synchronized void recordStatistics(QueryStatistic statistic) {
+
+  }
+
+  /**
+   * Below method will be used to log the statistic
+   */
+  public void logStatistics() {
+
+  }
+
+  /**
+   * Below method will be used to show statistic log as table
+   */
+  public void logStatisticsAsTableExecutor() {
+
+  }
+
+  public void recordStatisticsForDriver(QueryStatistic statistic, String 
queryId) {
+
+  }
+
+  public void logStatisticsAsTableDriver() {
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
new file mode 100644
index 0000000..9cfeab4
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.stats;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
+import static org.apache.carbondata.core.util.CarbonUtil.printLine;
+
+/**
+ * Class will be used to record and log the query statistics
+ */
+public class QueryStatisticsRecorderImpl implements 
QueryStatisticsRecorder,Serializable {
+
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(QueryStatisticsRecorderImpl.class.getName());
+
+  /**
+   * serialization version
+   */
+  private static final long serialVersionUID = -5719752001674467864L;
+
+  /**
+   * list for statistics to record time taken
+   * by each phase of the query for example aggregation
+   * scanning,block loading time etc.
+   */
+  private List<QueryStatistic> queryStatistics;
+
+  /**
+   * query with taskd
+   */
+  private String queryIWthTask;
+
+  public QueryStatisticsRecorderImpl(String queryId) {
+    queryStatistics = new ArrayList<QueryStatistic>();
+    this.queryIWthTask = queryId;
+  }
+
+  /**
+   * Below method will be used to add the statistics
+   *
+   * @param statistic
+   */
+  public synchronized void recordStatistics(QueryStatistic statistic) {
+    queryStatistics.add(statistic);
+  }
+
+  /**
+   * Below method will be used to log the statistic
+   */
+  public void logStatistics() {
+    for (QueryStatistic statistic : queryStatistics) {
+      LOGGER.statistic(statistic.getStatistics(queryIWthTask));
+    }
+  }
+
+  /**
+   * Below method will be used to show statistic log as table
+   */
+  public void logStatisticsAsTableExecutor() {
+    String tableInfo = collectExecutorStatistics();
+    if (null != tableInfo) {
+      LOGGER.statistic(tableInfo);
+    }
+  }
+
+  /**
+   * Below method will parse queryStatisticsMap and put time into table
+   */
+  public String collectExecutorStatistics() {
+    String load_blocks_time = "";
+    String scan_blocks_time = "";
+    String scan_blocks_num = "";
+    String load_dictionary_time = "";
+    String result_size = "";
+    String total_executor_time = "";
+    String splitChar = " ";
+    String total_blocklet = "";
+    String valid_scan_blocklet = "";
+    try {
+      for (QueryStatistic statistic : queryStatistics) {
+        switch (statistic.getMessage()) {
+          case QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR:
+            load_blocks_time += statistic.getTimeTaken() + splitChar;
+            break;
+          case QueryStatisticsConstants.SCAN_BLOCKS_TIME:
+            scan_blocks_time += statistic.getTimeTaken() + splitChar;
+            break;
+          case QueryStatisticsConstants.SCAN_BLOCKS_NUM:
+            scan_blocks_num += statistic.getCount() + splitChar;
+            break;
+          case QueryStatisticsConstants.LOAD_DICTIONARY:
+            load_dictionary_time += statistic.getTimeTaken() + splitChar;
+            break;
+          case QueryStatisticsConstants.RESULT_SIZE:
+            result_size += statistic.getCount() + splitChar;
+            break;
+          case QueryStatisticsConstants.EXECUTOR_PART:
+            total_executor_time += statistic.getTimeTaken() + splitChar;
+            break;
+          case QueryStatisticsConstants.TOTAL_BLOCKLET_NUM:
+            total_blocklet = statistic.getCount() + splitChar;
+            break;
+          case QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM:
+            valid_scan_blocklet = statistic.getCount() + splitChar;
+            break;
+          default:
+            break;
+        }
+      }
+      String headers = 
"task_id,load_blocks_time,load_dictionary_time,scan_blocks_time," +
+          "total_executor_time,scan_blocks_num,total_blocklet," +
+          "valid_scan_blocklet,result_size";
+      List<String> values = new ArrayList<String>();
+      values.add(queryIWthTask);
+      values.add(load_blocks_time);
+      values.add(load_dictionary_time);
+      values.add(scan_blocks_time);
+      values.add(total_executor_time);
+      values.add(scan_blocks_num);
+      values.add(total_blocklet);
+      values.add(valid_scan_blocklet);
+      values.add(result_size);
+      StringBuilder tableInfo = new StringBuilder();
+      String[] columns = headers.split(",");
+      String line = "";
+      String hearLine = "";
+      String valueLine = "";
+      for (int i = 0; i < columns.length; i++) {
+        int len = Math.max(columns[i].length(), values.get(i).length());
+        line += "+" + printLine("-", len);
+        hearLine += "|" + printLine(" ", len - columns[i].length()) + 
columns[i];
+        valueLine += "|" + printLine(" ", len - values.get(i).length()) + 
values.get(i);
+      }
+      // struct table info
+      tableInfo.append(line + "+").append("\n");
+      tableInfo.append(hearLine + "|").append("\n");
+      tableInfo.append(line + "+").append("\n");
+      tableInfo.append(valueLine + "|").append("\n");
+      tableInfo.append(line + "+").append("\n");
+      return "Print query statistic for each task id:" + "\n" + 
tableInfo.toString();
+    } catch (Exception ex) {
+      return "Put statistics into table failed, catch exception: " + 
ex.getMessage();
+    }
+  }
+
+  public void recordStatisticsForDriver(QueryStatistic statistic, String 
queryId) {
+
+  }
+
+  public void logStatisticsAsTableDriver() {
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
new file mode 100644
index 0000000..70043d4
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.statusmanager;
+
+import java.io.Serializable;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+public class LoadMetadataDetails implements Serializable {
+
+  private static final long serialVersionUID = 1106104914918491724L;
+  private String timestamp;
+  private String loadStatus;
+  private String loadName;
+  private String partitionCount;
+  private String isDeleted = CarbonCommonConstants.KEYWORD_FALSE;
+
+  // update delta end timestamp
+  private String updateDeltaEndTimestamp = "";
+
+  // update delta start timestamp
+  private String updateDeltaStartTimestamp = "";
+
+  // this will represent the update status file name at that point of time.
+  private String updateStatusFileName = "";
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(LoadMetadataDetails.class.getName());
+
+  // dont remove static as the write will fail.
+  private static final SimpleDateFormat parser =
+      new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
+  /**
+   * Segment modification or deletion time stamp
+   */
+  private String modificationOrdeletionTimesStamp;
+  private String loadStartTime;
+
+  private String mergedLoadName;
+  /**
+   * visibility is used to determine whether to the load is visible or not.
+   */
+  private String visibility = "true";
+
+  /**
+   * To know if the segment is a major compacted segment or not.
+   */
+  private String majorCompacted;
+
+  public String getPartitionCount() {
+    return partitionCount;
+  }
+
+  public void setPartitionCount(String partitionCount) {
+    this.partitionCount = partitionCount;
+  }
+
+  public long getLoadEndTime() {
+    return convertTimeStampToLong(timestamp);
+  }
+
+  public void setLoadEndTime(long timestamp) {
+    this.timestamp = getTimeStampConvertion(timestamp);;
+  }
+
+  public String getLoadStatus() {
+    return loadStatus;
+  }
+
+  public void setLoadStatus(String loadStatus) {
+    this.loadStatus = loadStatus;
+  }
+
+  public String getLoadName() {
+    return loadName;
+  }
+
+  public void setLoadName(String loadName) {
+    this.loadName = loadName;
+  }
+
+  /**
+   * @return the modificationOrdeletionTimesStamp
+   */
+  public long getModificationOrdeletionTimesStamp() {
+    if(null == modificationOrdeletionTimesStamp) {
+      return 0;
+    }
+    return convertTimeStampToLong(modificationOrdeletionTimesStamp);
+  }
+
+  /**
+   * @param modificationOrdeletionTimesStamp the 
modificationOrdeletionTimesStamp to set
+   */
+  public void setModificationOrdeletionTimesStamp(long 
modificationOrdeletionTimesStamp) {
+    this.modificationOrdeletionTimesStamp =
+        getTimeStampConvertion(modificationOrdeletionTimesStamp);
+  }
+
+  /* (non-Javadoc)
+   * @see java.lang.Object#hashCode()
+   */
+  @Override public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((loadName == null) ? 0 : loadName.hashCode());
+    return result;
+  }
+
+  /* (non-Javadoc)
+   * @see java.lang.Object#equals(java.lang.Object)
+   */
+  @Override public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+
+    }
+    if (!(obj instanceof LoadMetadataDetails)) {
+      return false;
+    }
+    LoadMetadataDetails other = (LoadMetadataDetails) obj;
+    if (loadName == null) {
+      if (other.loadName != null) {
+        return false;
+      }
+    } else if (!loadName.equals(other.loadName)) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * @return the startLoadTime
+   */
+  public long getLoadStartTime() {
+    return convertTimeStampToLong(loadStartTime);
+  }
+
+  /**
+   * return loadStartTime
+   *
+   * @return
+   */
+  public long getLoadStartTimeAsLong() {
+    return (!loadStartTime.isEmpty()) ? getTimeStamp(loadStartTime) : 0;
+  }
+
+  /**
+   * This method will convert a given timestamp to long value and then to 
string back
+   *
+   * @param factTimeStamp
+   * @return
+   */
+  private long convertTimeStampToLong(String factTimeStamp) {
+    SimpleDateFormat parser = new 
SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS);
+    Date dateToStr = null;
+    try {
+      dateToStr = parser.parse(factTimeStamp);
+      return dateToStr.getTime();
+    } catch (ParseException e) {
+      LOGGER.error("Cannot convert" + factTimeStamp + " to Time/Long type 
value" + e.getMessage());
+      parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
+      try {
+        dateToStr = parser.parse(factTimeStamp);
+        return dateToStr.getTime();
+      } catch (ParseException e1) {
+        LOGGER
+            .error("Cannot convert" + factTimeStamp + " to Time/Long type 
value" + e1.getMessage());
+        return 0;
+      }
+    }
+  }
+
+  /**
+   * returns load start time as long value
+   *
+   * @param loadStartTime
+   * @return
+   */
+  public Long getTimeStamp(String loadStartTime) {
+    Date dateToStr = null;
+    try {
+      dateToStr = parser.parse(loadStartTime);
+      return dateToStr.getTime() * 1000;
+    } catch (ParseException e) {
+      LOGGER.error("Cannot convert" + loadStartTime + " to Time/Long type 
value" + e.getMessage());
+      return null;
+    }
+  }
+
+  private String getTimeStampConvertion(long time) {
+    SimpleDateFormat sdf = new 
SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS);
+    return sdf.format(time);
+  }
+
+  /**
+   * @param loadStartTime
+   */
+  public void setLoadStartTime(long loadStartTime) {
+    this.loadStartTime = getTimeStampConvertion(loadStartTime);
+  }
+
+  /**
+   * @return the mergedLoadName
+   */
+  public String getMergedLoadName() {
+    return mergedLoadName;
+  }
+
+  /**
+   * @param mergedLoadName the mergedLoadName to set
+   */
+  public void setMergedLoadName(String mergedLoadName) {
+    this.mergedLoadName = mergedLoadName;
+  }
+
+  /**
+   * @return the visibility
+   */
+  public String getVisibility() {
+    return visibility;
+  }
+
+  /**
+   * @param visibility the visibility to set
+   */
+  public void setVisibility(String visibility) {
+    this.visibility = visibility;
+  }
+
+  /**
+   * Return true if it is a major compacted segment.
+   * @return majorCompacted
+   */
+  public String isMajorCompacted() {
+    return majorCompacted;
+  }
+
+  /**
+   * Set true if it is a major compacted segment.
+   *
+   * @param majorCompacted
+   */
+  public void setMajorCompacted(String majorCompacted) {
+    this.majorCompacted = majorCompacted;
+  }
+
+  /**
+   * To get isDeleted property.
+   *
+   * @return isDeleted
+   */
+  public String getIsDeleted() {
+    return isDeleted;
+  }
+
+  /**
+   * To set isDeleted property.
+   *
+   * @param isDeleted
+   */
+  public void setIsDeleted(String isDeleted) {
+    this.isDeleted = isDeleted;
+  }
+
+  /**
+   * To get the update delta end timestamp
+   *
+   * @return updateDeltaEndTimestamp
+   */
+  public String getUpdateDeltaEndTimestamp() {
+    return updateDeltaEndTimestamp;
+  }
+
+  /**
+   * To set the update delta end timestamp
+   *
+   * @param updateDeltaEndTimestamp
+   */
+  public void setUpdateDeltaEndTimestamp(String updateDeltaEndTimestamp) {
+    this.updateDeltaEndTimestamp = updateDeltaEndTimestamp;
+  }
+
+  /**
+   * To get the update delta start timestamp
+   *
+   * @return updateDeltaStartTimestamp
+   */
+  public String getUpdateDeltaStartTimestamp() {
+    return updateDeltaStartTimestamp;
+  }
+
+  /**
+   * To set the update delta start timestamp
+   *
+   * @param updateDeltaStartTimestamp
+   */
+  public void setUpdateDeltaStartTimestamp(String updateDeltaStartTimestamp) {
+    this.updateDeltaStartTimestamp = updateDeltaStartTimestamp;
+  }
+
+  /**
+   * To get the updateStatusFileName
+   *
+   * @return updateStatusFileName
+   */
+  public String getUpdateStatusFileName() {
+    return updateStatusFileName;
+  }
+
+  /**
+   * To set the updateStatusFileName
+   *
+   * @param updateStatusFileName
+   */
+  public void setUpdateStatusFileName(String updateStatusFileName) {
+    this.updateStatusFileName = updateStatusFileName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
new file mode 100644
index 0000000..1458d48
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -0,0 +1,642 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.statusmanager;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.locks.CarbonLockFactory;
+import org.apache.carbondata.core.locks.CarbonLockUtil;
+import org.apache.carbondata.core.locks.ICarbonLock;
+import org.apache.carbondata.core.locks.LockUsage;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+
+/**
+ * Manages Load/Segment status
+ */
+public class SegmentStatusManager {
+
+  private static final LogService LOG =
+      LogServiceFactory.getLogService(SegmentStatusManager.class.getName());
+
+  private AbsoluteTableIdentifier absoluteTableIdentifier;
+
+  public SegmentStatusManager(AbsoluteTableIdentifier absoluteTableIdentifier) 
{
+    this.absoluteTableIdentifier = absoluteTableIdentifier;
+  }
+
+  /**
+   * This will return the lock object used to lock the table status file 
before updation.
+   *
+   * @return
+   */
+  public ICarbonLock getTableStatusLock() {
+    return 
CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier.getCarbonTableIdentifier(),
+            LockUsage.TABLE_STATUS_LOCK);
+  }
+
+  /**
+   * This method will return last modified time of tablestatus file
+   */
+  public static long getTableStatusLastModifiedTime(AbsoluteTableIdentifier 
identifier)
+      throws IOException {
+    String tableStatusPath = CarbonStorePath
+        .getCarbonTablePath(identifier.getStorePath(), 
identifier.getCarbonTableIdentifier())
+        .getTableStatusFilePath();
+    if (!FileFactory.isFileExist(tableStatusPath, 
FileFactory.getFileType(tableStatusPath))) {
+      return 0L;
+    } else {
+      return FileFactory.getCarbonFile(tableStatusPath, 
FileFactory.getFileType(tableStatusPath))
+          .getLastModifiedTime();
+    }
+  }
+
+  public List<Long> getUpdateDeltaStartEndTimeStamp(final String loadIds,
+      LoadMetadataDetails[] listOfLoadFolderDetailsArray) {
+    List<Long> updateDeltaStartEndTimestamp = new ArrayList<>();
+    for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
+      if (loadIds.equalsIgnoreCase(loadMetadata.getLoadName())) {
+        // Make sure the Load is not compacted and not marked for delete.
+        if (CarbonCommonConstants.COMPACTED
+                .equalsIgnoreCase(loadMetadata.getLoadStatus())) {
+          return null;
+        } else if 
(CarbonCommonConstants.MARKED_FOR_DELETE.equals(loadMetadata.getLoadStatus())) {
+          return null;
+        }
+        else {
+          return updateDeltaStartEndTimestamp;
+        }
+      }
+    }
+    return null;
+  }
+
+
+  /**
+   * get valid segment for given table
+   *
+   * @return
+   * @throws IOException
+   */
+  public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws 
IOException {
+
+    // @TODO: move reading LoadStatus file to separate class
+    List<String> listOfValidSegments = new ArrayList<String>(10);
+    List<String> listOfValidUpdatedSegments = new ArrayList<String>(10);
+    List<String> listOfInvalidSegments = new ArrayList<String>(10);
+    CarbonTablePath carbonTablePath = CarbonStorePath
+            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+                    absoluteTableIdentifier.getCarbonTableIdentifier());
+    String dataPath = carbonTablePath.getTableStatusFilePath();
+    DataInputStream dataInputStream = null;
+    Gson gsonObjectToRead = new Gson();
+    AtomicFileOperations fileOperation =
+            new AtomicFileOperationsImpl(dataPath, 
FileFactory.getFileType(dataPath));
+    LoadMetadataDetails[] loadFolderDetailsArray;
+    try {
+      if (FileFactory.isFileExist(dataPath, 
FileFactory.getFileType(dataPath))) {
+        dataInputStream = fileOperation.openForRead();
+        BufferedReader buffReader =
+                new BufferedReader(new InputStreamReader(dataInputStream, 
"UTF-8"));
+        loadFolderDetailsArray = gsonObjectToRead.fromJson(buffReader, 
LoadMetadataDetails[].class);
+        //just directly iterate Array
+        List<LoadMetadataDetails> loadFolderDetails = 
Arrays.asList(loadFolderDetailsArray);
+        for (LoadMetadataDetails loadMetadataDetails : loadFolderDetails) {
+          if (CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
+                  || CarbonCommonConstants.MARKED_FOR_UPDATE
+                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
+                  || CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
+                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
+            // check for merged loads.
+            if (null != loadMetadataDetails.getMergedLoadName()) {
+              if 
(!listOfValidSegments.contains(loadMetadataDetails.getMergedLoadName())) {
+                
listOfValidSegments.add(loadMetadataDetails.getMergedLoadName());
+              }
+              // if merged load is updated then put it in updated list
+              if (CarbonCommonConstants.MARKED_FOR_UPDATE
+                      .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
+                
listOfValidUpdatedSegments.add(loadMetadataDetails.getMergedLoadName());
+              }
+              continue;
+            }
+
+            if (CarbonCommonConstants.MARKED_FOR_UPDATE
+                    .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
+
+              
listOfValidUpdatedSegments.add(loadMetadataDetails.getLoadName());
+            }
+            listOfValidSegments.add(loadMetadataDetails.getLoadName());
+          } else if ((CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
+                  || CarbonCommonConstants.COMPACTED
+                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
+                  || CarbonCommonConstants.MARKED_FOR_DELETE
+                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus()))) {
+            listOfInvalidSegments.add(loadMetadataDetails.getLoadName());
+          }
+        }
+      }
+    } catch (IOException e) {
+      LOG.error(e);
+      throw e;
+    } finally {
+      try {
+        if (null != dataInputStream) {
+          dataInputStream.close();
+        }
+      } catch (Exception e) {
+        LOG.error(e);
+        throw e;
+      }
+    }
+    return new ValidAndInvalidSegmentsInfo(listOfValidSegments, 
listOfValidUpdatedSegments,
+            listOfInvalidSegments);
+  }
+
+  /**
+   * This method reads the load metadata file
+   *
+   * @param tableFolderPath
+   * @return
+   */
+  public static LoadMetadataDetails[] readLoadMetadata(String tableFolderPath) 
{
+    Gson gsonObjectToRead = new Gson();
+    DataInputStream dataInputStream = null;
+    BufferedReader buffReader = null;
+    InputStreamReader inStream = null;
+    String metadataFileName = tableFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR
+        + CarbonCommonConstants.LOADMETADATA_FILENAME;
+    LoadMetadataDetails[] listOfLoadFolderDetailsArray;
+    AtomicFileOperations fileOperation =
+        new AtomicFileOperationsImpl(metadataFileName, 
FileFactory.getFileType(metadataFileName));
+
+    try {
+      if (!FileFactory.isFileExist(metadataFileName, 
FileFactory.getFileType(metadataFileName))) {
+        return new LoadMetadataDetails[0];
+      }
+      dataInputStream = fileOperation.openForRead();
+      inStream = new InputStreamReader(dataInputStream,
+              Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+      buffReader = new BufferedReader(inStream);
+      listOfLoadFolderDetailsArray =
+          gsonObjectToRead.fromJson(buffReader, LoadMetadataDetails[].class);
+    } catch (IOException e) {
+      return new LoadMetadataDetails[0];
+    } finally {
+      closeStreams(buffReader, inStream, dataInputStream);
+    }
+
+    return listOfLoadFolderDetailsArray;
+  }
+
+  /**
+   * compares two given date strings
+   *
+   * @param loadValue
+   * @param userValue
+   * @return -1 if first arg is less than second arg, 1 if first arg is 
greater than second arg,
+   * 0 otherwise
+   */
+  private static Integer compareDateValues(Long loadValue, Long userValue) {
+    return loadValue.compareTo(userValue);
+  }
+
+  /**
+   * updates deletion status
+   *
+   * @param loadIds
+   * @param tableFolderPath
+   * @return
+   */
+  public static List<String> updateDeletionStatus(AbsoluteTableIdentifier 
identifier,
+      List<String> loadIds, String tableFolderPath) throws Exception {
+    CarbonTableIdentifier carbonTableIdentifier = 
identifier.getCarbonTableIdentifier();
+    ICarbonLock carbonDeleteSegmentLock =
+        CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, 
LockUsage.DELETE_SEGMENT_LOCK);
+    ICarbonLock carbonTableStatusLock =
+        CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, 
LockUsage.TABLE_STATUS_LOCK);
+    String tableDetails =
+        carbonTableIdentifier.getDatabaseName() + "." + 
carbonTableIdentifier.getTableName();
+    List<String> invalidLoadIds = new ArrayList<String>(0);
+    try {
+      if (carbonDeleteSegmentLock.lockWithRetries()) {
+        LOG.info("Delete segment lock has been successfully acquired");
+
+        CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(
+            identifier.getStorePath(), identifier.getCarbonTableIdentifier());
+        String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
+        LoadMetadataDetails[] listOfLoadFolderDetailsArray = null;
+        if (!FileFactory.isFileExist(dataLoadLocation, 
FileFactory.getFileType(dataLoadLocation))) {
+          // log error.
+          LOG.error("Load metadata file is not present.");
+          return loadIds;
+        }
+        // read existing metadata details in load metadata.
+        listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath);
+        if (listOfLoadFolderDetailsArray != null && 
listOfLoadFolderDetailsArray.length != 0) {
+          updateDeletionStatus(loadIds, listOfLoadFolderDetailsArray, 
invalidLoadIds);
+          if (invalidLoadIds.isEmpty()) {
+            // All or None , if anything fails then dont write
+            if(carbonTableStatusLock.lockWithRetries()) {
+              LOG.info("Table status lock has been successfully acquired");
+              // To handle concurrency scenarios, always take latest metadata 
before writing
+              // into status file.
+              LoadMetadataDetails[] latestLoadMetadataDetails = 
readLoadMetadata(tableFolderPath);
+              updateLatestTableStatusDetails(listOfLoadFolderDetailsArray,
+                  latestLoadMetadataDetails);
+              writeLoadDetailsIntoFile(dataLoadLocation, 
listOfLoadFolderDetailsArray);
+            }
+            else {
+              String errorMsg = "Delete segment by id is failed for " + 
tableDetails
+                  + ". Not able to acquire the table status lock due to other 
operation running "
+                  + "in the background.";
+              LOG.audit(errorMsg);
+              LOG.error(errorMsg);
+              throw new Exception(errorMsg + " Please try after some time.");
+            }
+
+          } else {
+            return invalidLoadIds;
+          }
+
+        } else {
+          LOG.audit("Delete segment by Id is failed. No matching segment id 
found.");
+          return loadIds;
+        }
+
+      } else {
+        String errorMsg = "Delete segment by id is failed for " + tableDetails
+            + ". Not able to acquire the delete segment lock due to another 
delete "
+            + "operation is running in the background.";
+        LOG.audit(errorMsg);
+        LOG.error(errorMsg);
+        throw new Exception(errorMsg + " Please try after some time.");
+      }
+    } catch (IOException e) {
+      LOG.error("IOException" + e.getMessage());
+    } finally {
+      CarbonLockUtil.fileUnlock(carbonTableStatusLock, 
LockUsage.TABLE_STATUS_LOCK);
+      CarbonLockUtil.fileUnlock(carbonDeleteSegmentLock, 
LockUsage.DELETE_SEGMENT_LOCK);
+    }
+
+    return invalidLoadIds;
+  }
+
+  /**
+   * updates deletion status
+   *
+   * @param loadDate
+   * @param tableFolderPath
+   * @return
+   */
+  public static List<String> updateDeletionStatus(AbsoluteTableIdentifier 
identifier,
+      String loadDate, String tableFolderPath, Long loadStartTime) throws 
Exception {
+    CarbonTableIdentifier carbonTableIdentifier = 
identifier.getCarbonTableIdentifier();
+    ICarbonLock carbonDeleteSegmentLock =
+        CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, 
LockUsage.DELETE_SEGMENT_LOCK);
+    ICarbonLock carbonTableStatusLock =
+        CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, 
LockUsage.TABLE_STATUS_LOCK);
+    String tableDetails =
+        carbonTableIdentifier.getDatabaseName() + "." + 
carbonTableIdentifier.getTableName();
+    List<String> invalidLoadTimestamps = new ArrayList<String>(0);
+    try {
+      if (carbonDeleteSegmentLock.lockWithRetries()) {
+        LOG.info("Delete segment lock has been successfully acquired");
+
+        CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(
+            identifier.getStorePath(), identifier.getCarbonTableIdentifier());
+        String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
+        LoadMetadataDetails[] listOfLoadFolderDetailsArray = null;
+
+        if (!FileFactory.isFileExist(dataLoadLocation, 
FileFactory.getFileType(dataLoadLocation))) {
+          // log error.
+          LOG.error("Error message: " + "Load metadata file is not present.");
+          invalidLoadTimestamps.add(loadDate);
+          return invalidLoadTimestamps;
+        }
+        // read existing metadata details in load metadata.
+        listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath);
+        if (listOfLoadFolderDetailsArray != null && 
listOfLoadFolderDetailsArray.length != 0) {
+          updateDeletionStatus(loadDate, listOfLoadFolderDetailsArray, 
invalidLoadTimestamps,
+              loadStartTime);
+          if (invalidLoadTimestamps.isEmpty()) {
+            if(carbonTableStatusLock.lockWithRetries()) {
+              LOG.info("Table status lock has been successfully acquired.");
+              // To handle concurrency scenarios, always take latest metadata 
before writing
+              // into status file.
+              LoadMetadataDetails[] latestLoadMetadataDetails = 
readLoadMetadata(tableFolderPath);
+              updateLatestTableStatusDetails(listOfLoadFolderDetailsArray,
+                  latestLoadMetadataDetails);
+              writeLoadDetailsIntoFile(dataLoadLocation, 
listOfLoadFolderDetailsArray);
+            }
+            else {
+
+              String errorMsg = "Delete segment by date is failed for " + 
tableDetails
+                  + ". Not able to acquire the table status lock due to other 
operation running "
+                  + "in the background.";
+              LOG.audit(errorMsg);
+              LOG.error(errorMsg);
+              throw new Exception(errorMsg + " Please try after some time.");
+
+            }
+          } else {
+            return invalidLoadTimestamps;
+          }
+
+        } else {
+          LOG.audit("Delete segment by date is failed. No matching segment 
found.");
+          invalidLoadTimestamps.add(loadDate);
+          return invalidLoadTimestamps;
+        }
+
+      } else {
+        String errorMsg = "Delete segment by date is failed for " + 
tableDetails
+            + ". Not able to acquire the delete segment lock due to another 
delete "
+            + "operation is running in the background.";
+        LOG.audit(errorMsg);
+        LOG.error(errorMsg);
+        throw new Exception(errorMsg + " Please try after some time.");
+      }
+    } catch (IOException e) {
+      LOG.error("Error message: " + "IOException" + e.getMessage());
+    } finally {
+      CarbonLockUtil.fileUnlock(carbonTableStatusLock, 
LockUsage.TABLE_STATUS_LOCK);
+      CarbonLockUtil.fileUnlock(carbonDeleteSegmentLock, 
LockUsage.DELETE_SEGMENT_LOCK);
+    }
+
+    return invalidLoadTimestamps;
+  }
+
+  /**
+   * writes load details into a given file at @param dataLoadLocation
+   *
+   * @param dataLoadLocation
+   * @param listOfLoadFolderDetailsArray
+   * @throws IOException
+   */
+  public static void writeLoadDetailsIntoFile(String dataLoadLocation,
+      LoadMetadataDetails[] listOfLoadFolderDetailsArray) throws IOException {
+    AtomicFileOperations fileWrite =
+        new AtomicFileOperationsImpl(dataLoadLocation, 
FileFactory.getFileType(dataLoadLocation));
+    BufferedWriter brWriter = null;
+    DataOutputStream dataOutputStream = null;
+    Gson gsonObjectToWrite = new Gson();
+    // write the updated data into the metadata file.
+
+    try {
+      dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+              Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+      String metadataInstance = 
gsonObjectToWrite.toJson(listOfLoadFolderDetailsArray);
+      brWriter.write(metadataInstance);
+    } catch (IOException ioe) {
+      LOG.error("Error message: " + ioe.getLocalizedMessage());
+    } finally {
+      if (null != brWriter) {
+        brWriter.flush();
+      }
+      CarbonUtil.closeStreams(brWriter);
+      fileWrite.close();
+    }
+
+  }
+
+  /**
+   * updates deletion status details for each load and returns invalidLoadIds
+   *
+   * @param loadIds
+   * @param listOfLoadFolderDetailsArray
+   * @param invalidLoadIds
+   * @return invalidLoadIds
+   */
+  private static List<String> updateDeletionStatus(List<String> loadIds,
+      LoadMetadataDetails[] listOfLoadFolderDetailsArray, List<String> 
invalidLoadIds) {
+    for (String loadId : loadIds) {
+      boolean loadFound = false;
+      // For each load id loop through data and if the
+      // load id is found then mark
+      // the metadata as deleted.
+      for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
+
+        if (loadId.equalsIgnoreCase(loadMetadata.getLoadName())) {
+          // if the segment is compacted then no need to delete that.
+          if (CarbonCommonConstants.COMPACTED
+                  .equalsIgnoreCase(loadMetadata.getLoadStatus())) {
+            LOG.error("Cannot delete the Segment which is compacted. Segment 
is " + loadId);
+            invalidLoadIds.add(loadId);
+            return invalidLoadIds;
+          }
+          if 
(!CarbonCommonConstants.MARKED_FOR_DELETE.equals(loadMetadata.getLoadStatus())) 
{
+            loadFound = true;
+            
loadMetadata.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE);
+            
loadMetadata.setModificationOrdeletionTimesStamp(CarbonUpdateUtil.readCurrentTime());
+            LOG.info("Segment ID " + loadId + " Marked for Delete");
+          }
+          break;
+        }
+      }
+
+      if (!loadFound) {
+        LOG.audit("Delete segment by ID is failed. No matching segment id 
found :" + loadId);
+        invalidLoadIds.add(loadId);
+        return invalidLoadIds;
+      }
+    }
+    return invalidLoadIds;
+  }
+
+  /**
+   * updates deletion status details for load and returns invalidLoadTimestamps
+   *
+   * @param loadDate
+   * @param listOfLoadFolderDetailsArray
+   * @param invalidLoadTimestamps
+   * @return invalidLoadTimestamps
+   */
+  public static List<String> updateDeletionStatus(String loadDate,
+      LoadMetadataDetails[] listOfLoadFolderDetailsArray, List<String> 
invalidLoadTimestamps,
+      Long loadStartTime) {
+    // For each load timestamp loop through data and if the
+    // required load timestamp is found then mark
+    // the metadata as deleted.
+    boolean loadFound = false;
+    String loadStartTimeString = "Load Start Time: ";
+    for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
+      Integer result = 
compareDateValues(loadMetadata.getLoadStartTimeAsLong(), loadStartTime);
+      if (result < 0) {
+        if (CarbonCommonConstants.COMPACTED
+            .equalsIgnoreCase(loadMetadata.getLoadStatus())) {
+          LOG.info("Ignoring the segment : " + loadMetadata.getLoadName()
+              + "as the segment has been compacted.");
+          continue;
+        }
+        if 
(!CarbonCommonConstants.MARKED_FOR_DELETE.equals(loadMetadata.getLoadStatus())) 
{
+          loadFound = true;
+          updateSegmentMetadataDetails(loadMetadata);
+          LOG.info("Info: " +
+              loadStartTimeString + loadMetadata.getLoadStartTime() +
+              " Marked for Delete");
+        }
+      }
+
+    }
+
+    if (!loadFound) {
+      invalidLoadTimestamps.add(loadDate);
+      LOG.audit("Delete segment by date is failed. No matching segment 
found.");
+      return invalidLoadTimestamps;
+    }
+    return invalidLoadTimestamps;
+  }
+
+  /**
+   * This method closes the streams
+   *
+   * @param streams - streams to close.
+   */
+  private static void closeStreams(Closeable... streams) {
+    // Added if to avoid NullPointerException in case one stream is being 
passed as null
+    if (null != streams) {
+      for (Closeable stream : streams) {
+        if (null != stream) {
+          try {
+            stream.close();
+          } catch (IOException e) {
+            LOG.error("Error while closing stream" + stream);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * updates table status details using latest metadata
+   *
+   * @param oldMetadata
+   * @param newMetadata
+   * @return
+   */
+
+  public static List<LoadMetadataDetails> updateLatestTableStatusDetails(
+      LoadMetadataDetails[] oldMetadata, LoadMetadataDetails[] newMetadata) {
+
+    List<LoadMetadataDetails> newListMetadata =
+        new ArrayList<LoadMetadataDetails>(Arrays.asList(newMetadata));
+    for (LoadMetadataDetails oldSegment : oldMetadata) {
+      if 
(CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oldSegment.getLoadStatus()))
 {
+        
updateSegmentMetadataDetails(newListMetadata.get(newListMetadata.indexOf(oldSegment)));
+      }
+    }
+    return newListMetadata;
+  }
+
+  /**
+   * updates segment status and modificaton time details
+   *
+   * @param loadMetadata
+   */
+  public static void updateSegmentMetadataDetails(LoadMetadataDetails 
loadMetadata) {
+    // update status only if the segment is not marked for delete
+    if 
(!CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(loadMetadata.getLoadStatus()))
 {
+      loadMetadata.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE);
+      
loadMetadata.setModificationOrdeletionTimesStamp(CarbonUpdateUtil.readCurrentTime());
+    }
+  }
+
+  /**
+   * This API will return the update status file name.
+   * @param segmentList
+   * @return
+   */
+  public String getUpdateStatusFileName(LoadMetadataDetails[] segmentList) {
+    if(segmentList.length == 0) {
+      return "";
+    }
+    else {
+      for(LoadMetadataDetails eachSeg : segmentList) {
+        // file name stored in 0th segment.
+        if (eachSeg.getLoadName().equalsIgnoreCase("0")) {
+          return eachSeg.getUpdateStatusFileName();
+        }
+      }
+    }
+    return "";
+  }
+
+  /**
+   * getting the task numbers present in the segment.
+   * @param segmentId
+   * @return
+   */
+  public List<String> getUpdatedTasksDetailsForSegment(String segmentId, 
SegmentUpdateStatusManager
+          updateStatusManager) {
+    List<String> taskList = new 
ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    List<String> list = updateStatusManager.getUpdateDeltaFiles(segmentId);
+    for (String eachFileName : list) {
+      taskList.add(CarbonTablePath.DataFileUtil.getTaskNo(eachFileName));
+    }
+    return taskList;
+  }
+
+
+  public static class ValidAndInvalidSegmentsInfo {
+    private final List<String> listOfValidSegments;
+    private final List<String> listOfValidUpdatedSegments;
+    private final List<String> listOfInvalidSegments;
+
+    private ValidAndInvalidSegmentsInfo(List<String> listOfValidSegments,
+        List<String> listOfValidUpdatedSegments, List<String> 
listOfInvalidUpdatedSegments) {
+      this.listOfValidSegments = listOfValidSegments;
+      this.listOfValidUpdatedSegments = listOfValidUpdatedSegments;
+      this.listOfInvalidSegments = listOfInvalidUpdatedSegments;
+    }
+    public List<String> getInvalidSegments() {
+      return listOfInvalidSegments;
+    }
+    public List<String> getValidSegments() {
+      return listOfValidSegments;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
new file mode 100644
index 0000000..f14485b
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -0,0 +1,990 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.statusmanager;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.locks.CarbonLockFactory;
+import org.apache.carbondata.core.locks.ICarbonLock;
+import org.apache.carbondata.core.locks.LockUsage;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
+import org.apache.carbondata.core.mutate.TupleIdEnum;
+import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.reader.CarbonDeleteFilesDataReader;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+
+/**
+ * Manages Segment & block status of carbon table for Delete operation
+ */
+public class SegmentUpdateStatusManager {
+
+  /**
+   * logger
+   */
+  private static final LogService LOG =
+      
LogServiceFactory.getLogService(SegmentUpdateStatusManager.class.getName());
+
+  private AbsoluteTableIdentifier absoluteTableIdentifier;
+  private LoadMetadataDetails[] segmentDetails;
+  private SegmentUpdateDetails[] updateDetails;
+  private CarbonTablePath carbonTablePath;
+  private Map<String, SegmentUpdateDetails> blockAndDetailsMap;
+
+  /**
+   * @param absoluteTableIdentifier
+   */
+  public SegmentUpdateStatusManager(AbsoluteTableIdentifier 
absoluteTableIdentifier) {
+    this.absoluteTableIdentifier = absoluteTableIdentifier;
+    carbonTablePath = 
CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+        absoluteTableIdentifier.getCarbonTableIdentifier());
+    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(absoluteTableIdentifier);
+    // current it is used only for read function scenarios, as file update 
always requires to work
+    // on latest file status.
+    segmentDetails =
+        
segmentStatusManager.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath());
+    updateDetails = readLoadMetadata();
+    populateMap();
+  }
+
+  /**
+   * populate the block and its details in a map.
+   */
+  private void populateMap() {
+    blockAndDetailsMap = new 
HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    for(SegmentUpdateDetails blockDetails : updateDetails) {
+
+      String blockIdentifier = CarbonUpdateUtil
+          .getSegmentBlockNameKey(blockDetails.getSegmentName(), 
blockDetails.getActualBlockName());
+
+      blockAndDetailsMap.put(blockIdentifier, blockDetails);
+
+    }
+
+  }
+
+  /**
+   *
+   * @param segID
+   * @param actualBlockName
+   * @return null if block is not present in segment update status.
+   */
+  public SegmentUpdateDetails getDetailsForABlock(String segID, String 
actualBlockName) {
+
+    String blockIdentifier = CarbonUpdateUtil
+        .getSegmentBlockNameKey(segID, actualBlockName);
+
+    return blockAndDetailsMap.get(blockIdentifier);
+
+  }
+
+  /**
+   *
+   * @param key will be like (segid/blockname)  0/0-0-5464654654654
+   * @return
+   */
+  public SegmentUpdateDetails getDetailsForABlock(String key) {
+
+    return blockAndDetailsMap.get(key);
+
+  }
+
+
+
+  /**
+   * Returns the LoadMetadata Details
+   * @return
+   */
+  public LoadMetadataDetails[] getLoadMetadataDetails() {
+    return segmentDetails;
+  }
+
+  /**
+   *
+   * @param loadMetadataDetails
+   */
+  public void setLoadMetadataDetails(LoadMetadataDetails[] 
loadMetadataDetails) {
+    this.segmentDetails = loadMetadataDetails;
+  }
+
+  /**
+   * Returns the UpdateStatus Details.
+   * @return
+   */
+  public SegmentUpdateDetails[] getUpdateStatusDetails() {
+    return updateDetails;
+  }
+
+  /**
+   *
+   * @param segmentUpdateDetails
+   */
+  public void setUpdateStatusDetails(SegmentUpdateDetails[] 
segmentUpdateDetails) {
+    this.updateDetails = segmentUpdateDetails;
+  }
+
+  /**
+   * This will return the lock object used to lock the table update status 
file before updation.
+   *
+   * @return
+   */
+  public ICarbonLock getTableUpdateStatusLock() {
+    return 
CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier.getCarbonTableIdentifier(),
+        LockUsage.TABLE_UPDATE_STATUS_LOCK);
+  }
+
+  /**
+   * Returns all delete delta files of specified block
+   *
+   * @param tupleId
+   * @return
+   * @throws Exception
+   */
+  public List<String> getDeleteDeltaFiles(String tupleId) throws Exception {
+    return getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT);
+  }
+
+
+  /**
+   * Returns all update delta files of specified Segment.
+   *
+   * @param segmentId
+   * @return
+   * @throws Exception
+   */
+  public List<String> getUpdateDeltaFiles(final String segmentId) {
+    List<String> updatedDeltaFilesList =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    String endTimeStamp = "";
+    String startTimeStamp = "";
+    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", 
segmentId);
+    CarbonFile segDir =
+        FileFactory.getCarbonFile(segmentPath, 
FileFactory.getFileType(segmentPath));
+    for (LoadMetadataDetails eachSeg : segmentDetails) {
+      if (eachSeg.getLoadName().equalsIgnoreCase(segmentId)) {
+        // if the segment is found then take the start and end time stamp.
+        startTimeStamp = eachSeg.getUpdateDeltaStartTimestamp();
+        endTimeStamp = eachSeg.getUpdateDeltaEndTimestamp();
+      }
+    }
+    // if start timestamp is empty then no update delta is found. so return 
empty list.
+    if (startTimeStamp.isEmpty()) {
+      return updatedDeltaFilesList;
+    }
+    final Long endTimeStampFinal = 
CarbonUpdateUtil.getTimeStampAsLong(endTimeStamp);
+    final Long startTimeStampFinal = 
CarbonUpdateUtil.getTimeStampAsLong(startTimeStamp);
+
+    // else scan the segment for the delta files with the respective timestamp.
+    CarbonFile[] files = segDir.listFiles(new CarbonFileFilter() {
+
+      @Override public boolean accept(CarbonFile pathName) {
+        String fileName = pathName.getName();
+        if (fileName.endsWith(CarbonCommonConstants.UPDATE_DELTA_FILE_EXT)) {
+          String firstPart = fileName.substring(0, fileName.indexOf('.'));
+
+          long timestamp = Long.parseLong(firstPart
+              .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 
1,
+                  firstPart.length()));
+          if (Long.compare(timestamp, endTimeStampFinal) <= 0
+              && Long.compare(timestamp, startTimeStampFinal) >= 0) {
+
+            // if marked for delete then it is invalid.
+            if (!isBlockValid(segmentId, fileName)) {
+              return false;
+            }
+
+            return true;
+          }
+        }
+        return false;
+      }
+    });
+
+    for (CarbonFile cfile : files) {
+      updatedDeltaFilesList.add(cfile.getCanonicalPath());
+    }
+
+    return updatedDeltaFilesList;
+  }
+
+  /**
+   * Returns all deleted records of specified block
+   *
+   * @param tupleId
+   * @return
+   * @throws Exception
+   */
+  public int[] getDeleteDeltaDataFromAllFiles(String tupleId) throws Exception 
{
+    List<String> deltaFiles = getDeltaFiles(tupleId, 
CarbonCommonConstants.DELETE_DELTA_FILE_EXT);
+    CarbonDeleteFilesDataReader dataReader = new CarbonDeleteFilesDataReader();
+    String blockletId = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, 
TupleIdEnum.BLOCKLET_ID);
+    return dataReader.getDeleteDataFromAllFiles(deltaFiles, blockletId);
+  }
+
+
+
+  /**
+   * Returns all delta file paths of specified block
+   *
+   * @param tupleId
+   * @param extension
+   * @return
+   * @throws Exception
+   */
+  public List<String> getDeltaFiles(String tupleId, String extension) throws 
Exception {
+    try {
+      CarbonTablePath carbonTablePath = CarbonStorePath
+          .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+              absoluteTableIdentifier.getCarbonTableIdentifier());
+      String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, 
TupleIdEnum.SEGMENT_ID);
+      String carbonDataDirectoryPath = 
carbonTablePath.getCarbonDataDirectoryPath("0", segment);
+      String completeBlockName = CarbonTablePath.addDataPartPrefix(
+          CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, 
TupleIdEnum.BLOCK_ID)
+              + CarbonCommonConstants.FACT_FILE_EXT);
+      String blockPath =
+          carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR + 
completeBlockName;
+      CarbonFile file = FileFactory.getCarbonFile(blockPath, 
FileFactory.getFileType(blockPath));
+      if (!file.exists()) {
+        throw new Exception("Invalid tuple id " + tupleId);
+      }
+      String blockNameWithoutExtn = completeBlockName.substring(0, 
completeBlockName.indexOf('.'));
+      //blockName without timestamp
+      final String blockNameFromTuple =
+          blockNameWithoutExtn.substring(0, 
blockNameWithoutExtn.lastIndexOf("-"));
+      SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray =
+          readLoadMetadata();
+      return getDeltaFiles(file, blockNameFromTuple, 
listOfSegmentUpdateDetailsArray, extension,
+          segment);
+
+    } catch (Exception ex) {
+      String errorMsg = "Invalid tuple id " + tupleId;
+      LOG.error(errorMsg);
+      throw new Exception(errorMsg);
+    }
+  }
+
+  /**
+   * This method returns the list of Blocks associated with the segment
+   * from the SegmentUpdateDetails List.
+   * @param segmentName
+   * @return
+   */
+  public List<String> getBlockNameFromSegment(String segmentName) {
+    List<String> blockNames = new ArrayList<String>();
+    for (SegmentUpdateDetails block : updateDetails) {
+      if (block.getSegmentName().equalsIgnoreCase(segmentName) && 
!CarbonUpdateUtil
+          .isBlockInvalid(block.getStatus())) {
+        blockNames.add(block.getBlockName());
+      }
+    }
+    return blockNames;
+  }
+
+  /**
+   *
+   * @param segName
+   * @param blockName
+   * @return
+   */
+  public boolean isBlockValid(String segName, String blockName) {
+
+    SegmentUpdateDetails details = getDetailsForABlock(segName, blockName);
+
+    if (details == null || 
!CarbonUpdateUtil.isBlockInvalid(details.getStatus())) {
+      return true;
+    }
+
+    return false;
+  }
+  /**
+   * Returns all delta file paths of specified block
+   *
+   * @param blockDir
+   * @param blockNameFromTuple
+   * @param listOfSegmentUpdateDetailsArray
+   * @param extension
+   * @return
+   */
+  public List<String> getDeltaFiles(CarbonFile blockDir, final String 
blockNameFromTuple,
+      SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray,
+      final String extension,
+      String segment) {
+    List<String> deleteFileList = null;
+    for (SegmentUpdateDetails block : listOfSegmentUpdateDetailsArray) {
+      if (block.getBlockName().equalsIgnoreCase(blockNameFromTuple) && 
block.getSegmentName()
+          .equalsIgnoreCase(segment) && 
!CarbonUpdateUtil.isBlockInvalid(block.getStatus())) {
+        final long deltaStartTimestamp = getStartTimeOfDeltaFile(extension, 
block);
+        // If there is no delete delete file , then return null
+        if(deltaStartTimestamp == 0) {
+          return deleteFileList;
+        }
+        final long deltaEndTimeStamp = getEndTimeOfDeltaFile(extension, block);
+
+        // final long deltaEndTimeStamp = block.getDeleteDeltaEndTimeAsLong();
+        // final long deltaStartTimestamp = 
block.getDeleteDeltaStartTimeAsLong();
+        return getFilePaths(blockDir, blockNameFromTuple, extension, 
deleteFileList,
+            deltaStartTimestamp, deltaEndTimeStamp);
+      }
+    }
+    return deleteFileList;
+  }
+
+  private List<String> getFilePaths(CarbonFile blockDir, final String 
blockNameFromTuple,
+      final String extension, List<String> deleteFileList, final long 
deltaStartTimestamp,
+      final long deltaEndTimeStamp) {
+    CarbonFile[] files = blockDir.getParentFile().listFiles(new 
CarbonFileFilter() {
+
+      @Override public boolean accept(CarbonFile pathName) {
+        String fileName = pathName.getName();
+        if (fileName.endsWith(extension)) {
+          String firstPart = fileName.substring(0, fileName.indexOf('.'));
+          String blockName =
+              firstPart.substring(0, 
firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN));
+          long timestamp = Long.parseLong(firstPart
+              .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 
1,
+                  firstPart.length()));
+          if (blockNameFromTuple.equals(blockName) && ((
+              Long.compare(timestamp, deltaEndTimeStamp ) <= 0) && (
+              Long.compare(timestamp, deltaStartTimestamp) >= 0))) {
+            return true;
+          }
+        }
+        return false;
+      }
+    });
+
+    for (CarbonFile cfile : files) {
+      if(null == deleteFileList) {
+        deleteFileList = new ArrayList<String>(files.length);
+      }
+      deleteFileList.add(cfile.getCanonicalPath());
+    }
+    return deleteFileList;
+  }
+
+  /**
+   * Return all delta file for a block.
+   * @param segmentId
+   * @param blockName
+   * @return
+   */
+  public CarbonFile[] getDeleteDeltaFilesList(final String segmentId, final 
String blockName) {
+
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
+
+    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", 
segmentId);
+
+    CarbonFile segDir =
+        FileFactory.getCarbonFile(segmentPath, 
FileFactory.getFileType(segmentPath));
+
+    for (SegmentUpdateDetails block : updateDetails) {
+      if ((block.getBlockName().equalsIgnoreCase(blockName)) &&
+          (block.getSegmentName().equalsIgnoreCase(segmentId))
+          && !CarbonUpdateUtil.isBlockInvalid((block.getStatus()))) {
+        final long deltaStartTimestamp =
+            
getStartTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);
+        final long deltaEndTimeStamp =
+            getEndTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, 
block);
+        CarbonFile[] files = segDir.listFiles(new CarbonFileFilter() {
+
+          @Override public boolean accept(CarbonFile pathName) {
+            String fileName = pathName.getName();
+            if 
(fileName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)) {
+              String firstPart = fileName.substring(0, fileName.indexOf('.'));
+              String blkName = firstPart.substring(0, 
firstPart.lastIndexOf("-"));
+              long timestamp = Long.parseLong(
+                  firstPart.substring(firstPart.lastIndexOf("-") + 1, 
firstPart.length()));
+              if (blockName.equals(blkName) && (Long.compare(timestamp, 
deltaEndTimeStamp) <= 0)
+                  && (Long.compare(timestamp, deltaStartTimestamp) >= 0)) {
+                return true;
+              }
+            }
+            return false;
+          }
+        });
+
+        return files;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Returns all update delta files of specified Segment.
+   *
+   * @param segmentId
+   * @param validUpdateFiles if true then only the valid range files will be 
returned.
+   * @return
+   */
+  public CarbonFile[] getUpdateDeltaFilesList(String segmentId, final boolean 
validUpdateFiles,
+      final String fileExtension, final boolean excludeOriginalFact,
+      CarbonFile[] allFilesOfSegment) {
+
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
+    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(absoluteTableIdentifier);
+    String endTimeStamp = "";
+    String startTimeStamp = "";
+    long factTimeStamp = 0;
+
+    LoadMetadataDetails[] segmentDetails =
+        
segmentStatusManager.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath());
+
+    for (LoadMetadataDetails eachSeg : segmentDetails) {
+      if (eachSeg.getLoadName().equalsIgnoreCase(segmentId)) {
+        // if the segment is found then take the start and end time stamp.
+        startTimeStamp = eachSeg.getUpdateDeltaStartTimestamp();
+        endTimeStamp = eachSeg.getUpdateDeltaEndTimestamp();
+        factTimeStamp = eachSeg.getLoadStartTime();
+      }
+    }
+
+    // if start timestamp is empty then no update delta is found. so return 
empty list.
+    if (startTimeStamp.isEmpty()) {
+      return new CarbonFile[0];
+    }
+
+    final Long endTimeStampFinal = 
CarbonUpdateUtil.getTimeStampAsLong(endTimeStamp);
+    final Long startTimeStampFinal = 
CarbonUpdateUtil.getTimeStampAsLong(startTimeStamp);
+    final long factTimeStampFinal = factTimeStamp;
+
+    List<CarbonFile> listOfCarbonFiles =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    // else scan the segment for the delta files with the respective timestamp.
+
+    for (CarbonFile eachFile : allFilesOfSegment) {
+
+      String fileName = eachFile.getName();
+      if (fileName.endsWith(fileExtension)) {
+        String firstPart = fileName.substring(0, fileName.indexOf('.'));
+
+        long timestamp = Long.parseLong(firstPart
+            .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
+                firstPart.length()));
+
+        if (excludeOriginalFact) {
+          if (Long.compare(factTimeStampFinal, timestamp) == 0) {
+            continue;
+          }
+        }
+
+        if (validUpdateFiles) {
+          if (Long.compare(timestamp, endTimeStampFinal) <= 0
+              && Long.compare(timestamp, startTimeStampFinal) >= 0) {
+            listOfCarbonFiles.add(eachFile);
+          }
+        } else {
+          // invalid cases.
+          if (Long.compare(timestamp, startTimeStampFinal) < 0) {
+            listOfCarbonFiles.add(eachFile);
+          }
+        }
+      }
+    }
+
+    return listOfCarbonFiles.toArray(new CarbonFile[listOfCarbonFiles.size()]);
+  }
+
+  /**
+   * Returns all update delta files of specified Segment.
+   *
+   * @param segmentId
+   * @param validUpdateFiles
+   * @param fileExtension
+   * @param excludeOriginalFact
+   * @param allFilesOfSegment
+   * @return
+   */
+  public CarbonFile[] getUpdateDeltaFilesForSegment(String segmentId,
+      final boolean validUpdateFiles, final String fileExtension, final 
boolean excludeOriginalFact,
+      CarbonFile[] allFilesOfSegment) {
+
+    String endTimeStamp = "";
+    String startTimeStamp = "";
+    long factTimeStamp = 0;
+
+    for (LoadMetadataDetails eachSeg : segmentDetails) {
+      if (eachSeg.getLoadName().equalsIgnoreCase(segmentId)) {
+        // if the segment is found then take the start and end time stamp.
+        startTimeStamp = eachSeg.getUpdateDeltaStartTimestamp();
+        endTimeStamp = eachSeg.getUpdateDeltaEndTimestamp();
+        factTimeStamp = eachSeg.getLoadStartTime();
+      }
+    }
+
+    // if start timestamp is empty then no update delta is found. so return 
empty list.
+    if (startTimeStamp.isEmpty()) {
+      return new CarbonFile[0];
+    }
+
+    final Long endTimeStampFinal = 
CarbonUpdateUtil.getTimeStampAsLong(endTimeStamp);
+    final Long startTimeStampFinal = 
CarbonUpdateUtil.getTimeStampAsLong(startTimeStamp);
+    final long factTimeStampFinal = factTimeStamp;
+
+    List<CarbonFile> listOfCarbonFiles =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    // else scan the segment for the delta files with the respective timestamp.
+
+    for (CarbonFile eachFile : allFilesOfSegment) {
+
+      String fileName = eachFile.getName();
+      if (fileName.endsWith(fileExtension)) {
+        String firstPart = fileName.substring(0, fileName.indexOf('.'));
+
+        long timestamp = Long.parseLong(firstPart
+            .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
+                firstPart.length()));
+
+        if (excludeOriginalFact) {
+          if (Long.compare(factTimeStampFinal, timestamp) == 0) {
+            continue;
+          }
+        }
+
+        if (validUpdateFiles) {
+          if (Long.compare(timestamp, endTimeStampFinal) <= 0
+              && Long.compare(timestamp, startTimeStampFinal) >= 0) {
+
+            boolean validBlock = true;
+
+            for(SegmentUpdateDetails blockDetails : getUpdateStatusDetails()) {
+              if 
(blockDetails.getActualBlockName().equalsIgnoreCase(eachFile.getName())
+                  && 
CarbonUpdateUtil.isBlockInvalid(blockDetails.getStatus())) {
+                validBlock = false;
+              }
+            }
+
+            if (validBlock) {
+              listOfCarbonFiles.add(eachFile);
+            }
+
+          }
+        } else {
+          // invalid cases.
+          if (Long.compare(timestamp, startTimeStampFinal) < 0) {
+            listOfCarbonFiles.add(eachFile);
+          }
+        }
+      }
+    }
+
+    return listOfCarbonFiles.toArray(new CarbonFile[listOfCarbonFiles.size()]);
+  }
+
+  /**
+   *
+   * @param extension
+   * @param block
+   * @return
+   */
+  private long getStartTimeOfDeltaFile(String extension, SegmentUpdateDetails 
block) {
+    long startTimestamp;
+    switch (extension) {
+      case CarbonCommonConstants.DELETE_DELTA_FILE_EXT:
+        startTimestamp = block.getDeleteDeltaStartTimeAsLong();
+        break;
+      default:
+        startTimestamp = 0;
+    }
+    return startTimestamp;
+  }
+
+  /**
+   *
+   * @param extension
+   * @param block
+   * @return
+   */
+  private long getEndTimeOfDeltaFile(String extension, SegmentUpdateDetails 
block) {
+    long endTimestamp;
+    switch (extension) {
+      case CarbonCommonConstants.DELETE_DELTA_FILE_EXT:
+        endTimestamp = block.getDeleteDeltaEndTimeAsLong();
+        break;
+      default: endTimestamp = 0;
+    }
+    return endTimestamp;
+  }
+
+
+  /**
+   * This method loads segment update details
+   *
+   * @return
+   */
+  public SegmentUpdateDetails[] readLoadMetadata() {
+    Gson gsonObjectToRead = new Gson();
+    DataInputStream dataInputStream = null;
+    BufferedReader buffReader = null;
+    InputStreamReader inStream = null;
+    SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray;
+
+    // get the updated status file identifier from the table status.
+    String tableUpdateStatusIdentifier = getUpdatedStatusIdentifier();
+
+    if(null == tableUpdateStatusIdentifier) {
+      return new SegmentUpdateDetails[0];
+    }
+
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
+
+    String tableUpdateStatusPath =
+        carbonTablePath.getMetadataDirectoryPath() + 
CarbonCommonConstants.FILE_SEPARATOR
+            + tableUpdateStatusIdentifier;
+    AtomicFileOperations fileOperation = new 
AtomicFileOperationsImpl(tableUpdateStatusPath,
+        FileFactory.getFileType(tableUpdateStatusPath));
+
+    try {
+      if (!FileFactory
+          .isFileExist(tableUpdateStatusPath, 
FileFactory.getFileType(tableUpdateStatusPath))) {
+        return new SegmentUpdateDetails[0];
+      }
+      dataInputStream = fileOperation.openForRead();
+      inStream = new InputStreamReader(dataInputStream,
+          CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT);
+      buffReader = new BufferedReader(inStream);
+      listOfSegmentUpdateDetailsArray =
+          gsonObjectToRead.fromJson(buffReader, SegmentUpdateDetails[].class);
+    } catch (IOException e) {
+      return new SegmentUpdateDetails[0];
+    } finally {
+      closeStreams(buffReader, inStream, dataInputStream);
+    }
+
+    return listOfSegmentUpdateDetailsArray;
+  }
+
+  /**
+   * @return updateStatusFileName
+   */
+  private String getUpdatedStatusIdentifier() {
+    SegmentStatusManager ssm = new 
SegmentStatusManager(absoluteTableIdentifier);
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
+    LoadMetadataDetails[] loadDetails =
+        ssm.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath());
+    if (loadDetails.length == 0) {
+      return null;
+    }
+    return loadDetails[0].getUpdateStatusFileName();
+  }
+
+  /**
+   * writes segment update details into a given file at @param dataLoadLocation
+   *
+   * @param listOfSegmentUpdateDetailsArray
+   * @throws IOException
+   */
+  public void writeLoadDetailsIntoFile(List<SegmentUpdateDetails> 
listOfSegmentUpdateDetailsArray,
+      String updateStatusFileIdentifier) throws IOException {
+
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
+
+    String fileLocation =
+        carbonTablePath.getMetadataDirectoryPath() + 
CarbonCommonConstants.FILE_SEPARATOR
+            + 
CarbonUpdateUtil.getUpdateStatusFileName(updateStatusFileIdentifier);
+
+    AtomicFileOperations fileWrite =
+        new AtomicFileOperationsImpl(fileLocation, 
FileFactory.getFileType(fileLocation));
+    BufferedWriter brWriter = null;
+    DataOutputStream dataOutputStream = null;
+    Gson gsonObjectToWrite = new Gson();
+    // write the updated data into the metadata file.
+
+    try {
+      dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+          CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT));
+
+      String metadataInstance = 
gsonObjectToWrite.toJson(listOfSegmentUpdateDetailsArray);
+      brWriter.write(metadataInstance);
+    } catch (IOException ioe) {
+      LOG.error("Error message: " + ioe.getLocalizedMessage());
+    } finally {
+      if (null != brWriter) {
+        brWriter.flush();
+      }
+      CarbonUtil.closeStreams(brWriter);
+      fileWrite.close();
+    }
+
+  }
+
+  /**
+   * compares passed time stamp with status file delete timestamp and
+   * returns latest timestamp from status file if both are not equal
+   * returns null otherwise
+   *
+   * @param completeBlockName
+   * @param timestamp
+   * @return
+   */
+  public String getTimestampForRefreshCache(String completeBlockName, String 
timestamp) {
+    long cacheTimestamp = 0;
+    if (null != timestamp) {
+      cacheTimestamp = CarbonUpdateUtil.getTimeStampAsLong(timestamp);
+    }
+    String blockName = 
CarbonTablePath.addDataPartPrefix(CarbonUpdateUtil.getBlockName(
+        CarbonUpdateUtil.getRequiredFieldFromTID(completeBlockName, 
TupleIdEnum.BLOCK_ID)));
+    String segmentId =
+        CarbonUpdateUtil.getRequiredFieldFromTID(completeBlockName, 
TupleIdEnum.SEGMENT_ID);
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
+    SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray =
+        readLoadMetadata();
+    for (SegmentUpdateDetails block : listOfSegmentUpdateDetailsArray) {
+      if (segmentId.equalsIgnoreCase(block.getSegmentName()) && 
block.getBlockName()
+          .equalsIgnoreCase(blockName) && 
!CarbonUpdateUtil.isBlockInvalid(block.getStatus())) {
+        long deleteTimestampFromStatusFile = 
block.getDeleteDeltaEndTimeAsLong();
+        if (Long.compare(deleteTimestampFromStatusFile, cacheTimestamp) == 0) {
+          return null;
+        } else {
+          return block.getDeleteDeltaEndTimestamp();
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * This method closes the streams
+   *
+   * @param streams - streams to close.
+   */
+  private void closeStreams(Closeable... streams) {
+    // Added if to avoid NullPointerException in case one stream is being 
passed as null
+    if (null != streams) {
+      for (Closeable stream : streams) {
+        if (null != stream) {
+          try {
+            stream.close();
+          } catch (IOException e) {
+            LOG.error("Error while closing stream" + stream);
+          }
+        }
+      }
+    }
+  }
+  /**
+   * Get the invalid tasks in that segment.
+   * @param segmentId
+   * @return
+   */
+  public List<String> getInvalidBlockList(String segmentId) {
+
+    // get the original fact file timestamp from the table status file.
+    List<String> listOfInvalidBlocks = new ArrayList<String>();
+    SegmentStatusManager ssm = new 
SegmentStatusManager(absoluteTableIdentifier);
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
+    LoadMetadataDetails[] segmentDetails =
+        ssm.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath());
+    long timestampOfOriginalFacts = 0;
+
+    String startTimestampOfUpdate = "" ;
+    String endTimestampOfUpdate = "";
+
+    for(LoadMetadataDetails segment : segmentDetails){
+      // find matching segment and return timestamp.
+      if (segment.getLoadName().equalsIgnoreCase(segmentId)) {
+        timestampOfOriginalFacts = segment.getLoadStartTime();
+        startTimestampOfUpdate = segment.getUpdateDeltaStartTimestamp();
+        endTimestampOfUpdate = segment.getUpdateDeltaEndTimestamp();
+      }
+    }
+
+    if (startTimestampOfUpdate.isEmpty()) {
+      return listOfInvalidBlocks;
+
+    }
+
+    // now after getting the original fact timestamp, what ever is remaining
+    // files need to cross check it with table status file.
+
+    // filter out the fact files.
+
+    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", 
segmentId);
+    CarbonFile segDir =
+        FileFactory.getCarbonFile(segmentPath, 
FileFactory.getFileType(segmentPath));
+
+    final Long endTimeStampFinal = 
CarbonUpdateUtil.getTimeStampAsLong(endTimestampOfUpdate);
+    final Long startTimeStampFinal = 
CarbonUpdateUtil.getTimeStampAsLong(startTimestampOfUpdate);
+    final Long timeStampOriginalFactFinal =
+        timestampOfOriginalFacts;
+
+    CarbonFile[] files = segDir.listFiles(new CarbonFileFilter() {
+
+      @Override public boolean accept(CarbonFile pathName) {
+        String fileName = pathName.getName();
+        if (fileName.endsWith(CarbonCommonConstants.UPDATE_DELTA_FILE_EXT)) {
+          String firstPart = fileName.substring(0, fileName.indexOf('.'));
+
+          long timestamp = Long.parseLong(firstPart
+              .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 
1,
+                  firstPart.length()));
+          if (Long.compare(timestamp, endTimeStampFinal) <= 0
+              && Long.compare(timestamp, startTimeStampFinal) >= 0) {
+            return false;
+          }
+          if (Long.compare(timestamp, timeStampOriginalFactFinal) == 0) {
+            return false;
+          }
+          // take the rest of files as they are invalid.
+          return true;
+        }
+        return false;
+      }
+    });
+
+    // gather the task numbers.
+    for(CarbonFile updateFiles : files) {
+      listOfInvalidBlocks.add(updateFiles.getName());
+    }
+
+    return listOfInvalidBlocks;
+  }
+  /**
+   * Returns the invalid timestamp range of a segment.
+   * @param segmentId
+   * @return
+   */
+  public UpdateVO getInvalidTimestampRange(String segmentId) {
+    UpdateVO range = new UpdateVO();
+    for (LoadMetadataDetails segment : segmentDetails) {
+      if (segment.getLoadName().equalsIgnoreCase(segmentId)) {
+        range.setSegmentId(segmentId);
+        range.setFactTimestamp(segment.getLoadStartTime());
+        if (!segment.getUpdateDeltaStartTimestamp().isEmpty() && !segment
+            .getUpdateDeltaEndTimestamp().isEmpty()) {
+          range.setUpdateDeltaStartTimestamp(
+              
CarbonUpdateUtil.getTimeStampAsLong(segment.getUpdateDeltaStartTimestamp()));
+          range.setLatestUpdateTimestamp(
+              
CarbonUpdateUtil.getTimeStampAsLong(segment.getUpdateDeltaEndTimestamp()));
+        }
+      }
+    }
+    return range;
+  }
+  /**
+   *
+   * @param segmentId
+   * @param block
+   * @param needCompleteList
+   * @return
+   */
+  public CarbonFile[] getDeleteDeltaInvalidFilesList(final String segmentId,
+      final SegmentUpdateDetails block, final boolean needCompleteList,
+      CarbonFile[] allSegmentFiles) {
+
+    final long deltaStartTimestamp =
+        getStartTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, 
block);
+
+    List<CarbonFile> files =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    for (CarbonFile eachFile : allSegmentFiles) {
+      String fileName = eachFile.getName();
+      if (fileName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)) {
+        String blkName = 
CarbonTablePath.DataFileUtil.getBlockNameFromDeleteDeltaFile(fileName);
+
+        // complete list of delta files of that block is returned.
+        if (needCompleteList && 
block.getBlockName().equalsIgnoreCase(blkName)) {
+          files.add(eachFile);
+        }
+
+        // invalid delete delta files only will be returned.
+        long timestamp = CarbonUpdateUtil.getTimeStampAsLong(
+            
CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(fileName));
+
+        if (block.getBlockName().equalsIgnoreCase(blkName) && (
+            Long.compare(timestamp, deltaStartTimestamp) < 0)) {
+          files.add(eachFile);
+        }
+      }
+    }
+
+    return files.toArray(new CarbonFile[files.size()]);
+  }
+
+  /**
+   *
+   * @param blockName
+   * @param allSegmentFiles
+   * @return
+   */
+  public CarbonFile[] getAllBlockRelatedFiles(String blockName, CarbonFile[] 
allSegmentFiles,
+                                              String actualBlockName) {
+    List<CarbonFile> files = new 
ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    for (CarbonFile eachFile : allSegmentFiles) {
+
+      // for carbon data.
+      if (eachFile.getName().equalsIgnoreCase(actualBlockName)) {
+        files.add(eachFile);
+      }
+
+      // get carbon index files of the block.
+      String taskNum = CarbonTablePath.DataFileUtil.getTaskNo(actualBlockName);
+      // String indexFileEndsWith = timestamp + 
CarbonTablePath.getCarbonIndexExtension();
+      if 
(eachFile.getName().endsWith(CarbonTablePath.getCarbonIndexExtension()) && 
eachFile
+          .getName().startsWith(taskNum)) {
+        files.add(eachFile);
+      }
+
+    }
+
+    return files.toArray(new CarbonFile[files.size()]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/unsafe/CarbonUnsafe.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/unsafe/CarbonUnsafe.java 
b/core/src/main/java/org/apache/carbondata/core/unsafe/CarbonUnsafe.java
deleted file mode 100644
index 49a9dc4..0000000
--- a/core/src/main/java/org/apache/carbondata/core/unsafe/CarbonUnsafe.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.unsafe;
-
-import java.lang.reflect.Field;
-
-import sun.misc.Unsafe;
-
-
-public final class CarbonUnsafe {
-
-  public static final int BYTE_ARRAY_OFFSET;
-
-  public static final int SHORT_ARRAY_OFFSET;
-
-  public static final int INT_ARRAY_OFFSET;
-
-  public static final int LONG_ARRAY_OFFSET;
-
-  public static final int DOUBLE_ARRAY_OFFSET;
-
-  public static Unsafe unsafe;
-
-  static {
-    try {
-      Field cause = Unsafe.class.getDeclaredField("theUnsafe");
-      cause.setAccessible(true);
-      unsafe = (Unsafe) cause.get(null);
-    } catch (Throwable var2) {
-      unsafe = null;
-    }
-    if (unsafe != null) {
-      BYTE_ARRAY_OFFSET = unsafe.arrayBaseOffset(byte[].class);
-      SHORT_ARRAY_OFFSET = unsafe.arrayBaseOffset(short[].class);
-      INT_ARRAY_OFFSET = unsafe.arrayBaseOffset(int[].class);
-      LONG_ARRAY_OFFSET = unsafe.arrayBaseOffset(long[].class);
-      DOUBLE_ARRAY_OFFSET = unsafe.arrayBaseOffset(double[].class);
-    } else {
-      BYTE_ARRAY_OFFSET = 0;
-      SHORT_ARRAY_OFFSET = 0;
-      INT_ARRAY_OFFSET = 0;
-      LONG_ARRAY_OFFSET = 0;
-      DOUBLE_ARRAY_OFFSET = 0;
-    }
-  }
-}

Reply via email to