http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
 
b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
index f6e8e90..1de4702 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
@@ -24,9 +24,9 @@ import java.util.Date;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonProperties;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
 
b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
index ec1a710..bb5edee 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
@@ -24,15 +24,19 @@ import java.util.Date;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonProperties;
 
-import static 
org.apache.carbondata.core.keygenerator.directdictionary.timestamp.TimeStampGranularityConstants.TIME_GRAN_DAY;
-import static 
org.apache.carbondata.core.keygenerator.directdictionary.timestamp.TimeStampGranularityConstants.TIME_GRAN_HOUR;
-import static 
org.apache.carbondata.core.keygenerator.directdictionary.timestamp.TimeStampGranularityConstants.TIME_GRAN_MIN;
-import static 
org.apache.carbondata.core.keygenerator.directdictionary.timestamp.TimeStampGranularityConstants.TIME_GRAN_SEC;
+import static 
org.apache.carbondata.core.keygenerator.directdictionary.timestamp
+    .TimeStampGranularityConstants.TIME_GRAN_DAY;
+import static 
org.apache.carbondata.core.keygenerator.directdictionary.timestamp
+    .TimeStampGranularityConstants.TIME_GRAN_HOUR;
+import static 
org.apache.carbondata.core.keygenerator.directdictionary.timestamp
+    .TimeStampGranularityConstants.TIME_GRAN_MIN;
+import static 
org.apache.carbondata.core.keygenerator.directdictionary.timestamp
+    .TimeStampGranularityConstants.TIME_GRAN_SEC;
 
 /**
  * The class provides the method to generate dictionary key and getting the 
actual value from

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/load/BlockDetails.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/load/BlockDetails.java 
b/core/src/main/java/org/apache/carbondata/core/load/BlockDetails.java
deleted file mode 100644
index 69c7cf5..0000000
--- a/core/src/main/java/org/apache/carbondata/core/load/BlockDetails.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.load;
-
-import java.io.Serializable;
-
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-/**
- * blocks info
- * TODO Remove this class after removing of kettle.
- */
-public class BlockDetails extends FileSplit implements Serializable {
-
-  /**
-   * serialization version
-   */
-  private static final long serialVersionUID = 2293906691860002339L;
-  //block offset
-  private long blockOffset;
-  //block length
-  private long blockLength;
-  //file path which block belong to
-  private String filePath;
-  // locations where this block exists
-  private String[] locations;
-
-  public BlockDetails(Path filePath, long blockOffset, long blockLength, 
String[] locations) {
-    super(filePath, blockOffset, blockLength, locations);
-    this.filePath = filePath.toString();
-    this.blockOffset = blockOffset;
-    this.blockLength = blockLength;
-    this.locations = locations;
-  }
-
-  public long getBlockOffset() {
-    return blockOffset;
-  }
-
-  public long getBlockLength() {
-    return blockLength;
-  }
-
-  public String getFilePath() {
-    return FileFactory.getUpdatedFilePath(filePath);
-  }
-
-  public void setFilePath(String filePath) {
-    this.filePath = filePath;
-  }
-
-  public String[] getLocations() {
-    return locations;
-  }
-
-  /** The file containing this split's data. */
-  @Override
-  public Path getPath() { return new Path(filePath); }
-
-  /** The position of the first byte in the file to process. */
-  @Override
-  public long getStart() { return blockOffset; }
-
-  /** The number of bytes in the file to process. */
-  @Override
-  public long getLength() { return blockLength; }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/load/LoadMetadataDetails.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/load/LoadMetadataDetails.java 
b/core/src/main/java/org/apache/carbondata/core/load/LoadMetadataDetails.java
deleted file mode 100644
index 2861b20..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/load/LoadMetadataDetails.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.load;
-
-import java.io.Serializable;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-
-public class LoadMetadataDetails implements Serializable {
-
-  private static final long serialVersionUID = 1106104914918491724L;
-  private String timestamp;
-  private String loadStatus;
-  private String loadName;
-  private String partitionCount;
-  private String isDeleted = CarbonCommonConstants.KEYWORD_FALSE;
-
-  // update delta end timestamp
-  private String updateDeltaEndTimestamp = "";
-
-  // update delta start timestamp
-  private String updateDeltaStartTimestamp = "";
-
-  // this will represent the update status file name at that point of time.
-  private String updateStatusFileName = "";
-
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(LoadMetadataDetails.class.getName());
-
-  // dont remove static as the write will fail.
-  private static final SimpleDateFormat parser =
-      new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
-  /**
-   * Segment modification or deletion time stamp
-   */
-  private String modificationOrdeletionTimesStamp;
-  private String loadStartTime;
-
-  private String mergedLoadName;
-  /**
-   * visibility is used to determine whether to the load is visible or not.
-   */
-  private String visibility = "true";
-
-  /**
-   * To know if the segment is a major compacted segment or not.
-   */
-  private String majorCompacted;
-
-  public String getPartitionCount() {
-    return partitionCount;
-  }
-
-  public void setPartitionCount(String partitionCount) {
-    this.partitionCount = partitionCount;
-  }
-
-  public long getLoadEndTime() {
-    return convertTimeStampToLong(timestamp);
-  }
-
-  public void setLoadEndTime(long timestamp) {
-    this.timestamp = getTimeStampConvertion(timestamp);;
-  }
-
-  public String getLoadStatus() {
-    return loadStatus;
-  }
-
-  public void setLoadStatus(String loadStatus) {
-    this.loadStatus = loadStatus;
-  }
-
-  public String getLoadName() {
-    return loadName;
-  }
-
-  public void setLoadName(String loadName) {
-    this.loadName = loadName;
-  }
-
-  /**
-   * @return the modificationOrdeletionTimesStamp
-   */
-  public long getModificationOrdeletionTimesStamp() {
-    if(null == modificationOrdeletionTimesStamp) {
-      return 0;
-    }
-    return convertTimeStampToLong(modificationOrdeletionTimesStamp);
-  }
-
-  /**
-   * @param modificationOrdeletionTimesStamp the 
modificationOrdeletionTimesStamp to set
-   */
-  public void setModificationOrdeletionTimesStamp(long 
modificationOrdeletionTimesStamp) {
-    this.modificationOrdeletionTimesStamp =
-        getTimeStampConvertion(modificationOrdeletionTimesStamp);
-  }
-
-  /* (non-Javadoc)
-   * @see java.lang.Object#hashCode()
-   */
-  @Override public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((loadName == null) ? 0 : loadName.hashCode());
-    return result;
-  }
-
-  /* (non-Javadoc)
-   * @see java.lang.Object#equals(java.lang.Object)
-   */
-  @Override public boolean equals(Object obj) {
-    if (obj == null) {
-      return false;
-
-    }
-    if (!(obj instanceof LoadMetadataDetails)) {
-      return false;
-    }
-    LoadMetadataDetails other = (LoadMetadataDetails) obj;
-    if (loadName == null) {
-      if (other.loadName != null) {
-        return false;
-      }
-    } else if (!loadName.equals(other.loadName)) {
-      return false;
-    }
-    return true;
-  }
-
-  /**
-   * @return the startLoadTime
-   */
-  public long getLoadStartTime() {
-    return convertTimeStampToLong(loadStartTime);
-  }
-
-  /**
-   * return loadStartTime
-   *
-   * @return
-   */
-  public long getLoadStartTimeAsLong() {
-    return (!loadStartTime.isEmpty()) ? getTimeStamp(loadStartTime) : 0;
-  }
-
-  /**
-   * This method will convert a given timestamp to long value and then to 
string back
-   *
-   * @param factTimeStamp
-   * @return
-   */
-  private long convertTimeStampToLong(String factTimeStamp) {
-    SimpleDateFormat parser = new 
SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS);
-    Date dateToStr = null;
-    try {
-      dateToStr = parser.parse(factTimeStamp);
-      return dateToStr.getTime();
-    } catch (ParseException e) {
-      LOGGER.error("Cannot convert" + factTimeStamp + " to Time/Long type 
value" + e.getMessage());
-      parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
-      try {
-        dateToStr = parser.parse(factTimeStamp);
-        return dateToStr.getTime();
-      } catch (ParseException e1) {
-        LOGGER
-            .error("Cannot convert" + factTimeStamp + " to Time/Long type 
value" + e1.getMessage());
-        return 0;
-      }
-    }
-  }
-
-  /**
-   * returns load start time as long value
-   *
-   * @param loadStartTime
-   * @return
-   */
-  public Long getTimeStamp(String loadStartTime) {
-    Date dateToStr = null;
-    try {
-      dateToStr = parser.parse(loadStartTime);
-      return dateToStr.getTime() * 1000;
-    } catch (ParseException e) {
-      LOGGER.error("Cannot convert" + loadStartTime + " to Time/Long type 
value" + e.getMessage());
-      return null;
-    }
-  }
-
-  private String getTimeStampConvertion(long time) {
-    SimpleDateFormat sdf = new 
SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS);
-    return sdf.format(time);
-  }
-
-  /**
-   * @param loadStartTime
-   */
-  public void setLoadStartTime(long loadStartTime) {
-    this.loadStartTime = getTimeStampConvertion(loadStartTime);
-  }
-
-  /**
-   * @return the mergedLoadName
-   */
-  public String getMergedLoadName() {
-    return mergedLoadName;
-  }
-
-  /**
-   * @param mergedLoadName the mergedLoadName to set
-   */
-  public void setMergedLoadName(String mergedLoadName) {
-    this.mergedLoadName = mergedLoadName;
-  }
-
-  /**
-   * @return the visibility
-   */
-  public String getVisibility() {
-    return visibility;
-  }
-
-  /**
-   * @param visibility the visibility to set
-   */
-  public void setVisibility(String visibility) {
-    this.visibility = visibility;
-  }
-
-  /**
-   * Return true if it is a major compacted segment.
-   * @return majorCompacted
-   */
-  public String isMajorCompacted() {
-    return majorCompacted;
-  }
-
-  /**
-   * Set true if it is a major compacted segment.
-   *
-   * @param majorCompacted
-   */
-  public void setMajorCompacted(String majorCompacted) {
-    this.majorCompacted = majorCompacted;
-  }
-
-  /**
-   * To get isDeleted property.
-   *
-   * @return isDeleted
-   */
-  public String getIsDeleted() {
-    return isDeleted;
-  }
-
-  /**
-   * To set isDeleted property.
-   *
-   * @param isDeleted
-   */
-  public void setIsDeleted(String isDeleted) {
-    this.isDeleted = isDeleted;
-  }
-
-  /**
-   * To get the update delta end timestamp
-   *
-   * @return updateDeltaEndTimestamp
-   */
-  public String getUpdateDeltaEndTimestamp() {
-    return updateDeltaEndTimestamp;
-  }
-
-  /**
-   * To set the update delta end timestamp
-   *
-   * @param updateDeltaEndTimestamp
-   */
-  public void setUpdateDeltaEndTimestamp(String updateDeltaEndTimestamp) {
-    this.updateDeltaEndTimestamp = updateDeltaEndTimestamp;
-  }
-
-  /**
-   * To get the update delta start timestamp
-   *
-   * @return updateDeltaStartTimestamp
-   */
-  public String getUpdateDeltaStartTimestamp() {
-    return updateDeltaStartTimestamp;
-  }
-
-  /**
-   * To set the update delta start timestamp
-   *
-   * @param updateDeltaStartTimestamp
-   */
-  public void setUpdateDeltaStartTimestamp(String updateDeltaStartTimestamp) {
-    this.updateDeltaStartTimestamp = updateDeltaStartTimestamp;
-  }
-
-  /**
-   * To get the updateStatusFileName
-   *
-   * @return updateStatusFileName
-   */
-  public String getUpdateStatusFileName() {
-    return updateStatusFileName;
-  }
-
-  /**
-   * To set the updateStatusFileName
-   *
-   * @param updateStatusFileName
-   */
-  public void setUpdateStatusFileName(String updateStatusFileName) {
-    this.updateStatusFileName = updateStatusFileName;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java 
b/core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java
new file mode 100644
index 0000000..a27e8d6
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.locks;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+/**
+ * This is the abstract class of the lock implementations.This handles the
+ * retrying part of the locking.
+ */
+public abstract class AbstractCarbonLock implements ICarbonLock {
+  private int retryCount;
+
+  private int retryTimeout;
+
+  public abstract boolean lock();
+
+  /**
+   * API for enabling the locking of file with retries.
+   */
+  public boolean lockWithRetries() {
+    try {
+      for (int i = 0; i < retryCount; i++) {
+        if (lock()) {
+          return true;
+        } else {
+          Thread.sleep(retryTimeout * 1000L);
+        }
+      }
+    } catch (InterruptedException e) {
+      return false;
+    }
+    return false;
+  }
+
+  /**
+   * Initializes the retry count and retry timeout.
+   * This will determine how many times to retry to acquire lock and the retry 
timeout.
+   */
+  protected void initRetry() {
+    String retries = CarbonProperties.getInstance()
+        
.getProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK);
+    try {
+      retryCount = Integer.parseInt(retries);
+    } catch (NumberFormatException e) {
+      retryCount = 
CarbonCommonConstants.NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK_DEFAULT;
+    }
+
+    String maxTimeout = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK);
+    try {
+      retryTimeout = Integer.parseInt(maxTimeout);
+    } catch (NumberFormatException e) {
+      retryTimeout = 
CarbonCommonConstants.MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK_DEFAULT;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java 
b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
new file mode 100644
index 0000000..9890458
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.locks;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+/**
+ * This class is a Lock factory class which is used to provide lock objects.
+ * Using this lock object client can request the lock and unlock.
+ */
+public class CarbonLockFactory {
+
+  /**
+   * lockTypeConfigured to check if zookeeper feature is enabled or not for 
carbon.
+   */
+  private static String lockTypeConfigured;
+
+  static {
+    CarbonLockFactory.getLockTypeConfigured();
+  }
+
+  /**
+   * This method will determine the lock type.
+   *
+   * @param tableIdentifier
+   * @param lockFile
+   * @return
+   */
+  public static ICarbonLock getCarbonLockObj(CarbonTableIdentifier 
tableIdentifier,
+      String lockFile) {
+    switch (lockTypeConfigured.toUpperCase()) {
+      case CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL:
+        return new LocalFileLock(tableIdentifier, lockFile);
+
+      case CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER:
+        return new ZooKeeperLocking(tableIdentifier, lockFile);
+
+      case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS:
+        return new HdfsFileLock(tableIdentifier, lockFile);
+
+      default:
+        throw new UnsupportedOperationException("Not supported the lock type");
+    }
+  }
+
+  /**
+   *
+   * @param locFileLocation
+   * @param lockFile
+   * @return carbon lock
+   */
+  public static ICarbonLock getCarbonLockObj(String locFileLocation, String 
lockFile) {
+    switch (lockTypeConfigured.toUpperCase()) {
+      case CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL:
+        return new LocalFileLock(locFileLocation, lockFile);
+
+      case CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER:
+        return new ZooKeeperLocking(locFileLocation, lockFile);
+
+      case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS:
+        return new HdfsFileLock(locFileLocation, lockFile);
+
+      default:
+        throw new UnsupportedOperationException("Not supported the lock type");
+    }
+  }
+
+  /**
+   * This method will set the zookeeper status whether zookeeper to be used 
for locking or not.
+   */
+  private static void getLockTypeConfigured() {
+    lockTypeConfigured = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.LOCK_TYPE, 
CarbonCommonConstants.LOCK_TYPE_DEFAULT);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java 
b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
new file mode 100644
index 0000000..f8f85e5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.locks;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
+/**
+ * This class contains all carbon lock utilities
+ */
+public class CarbonLockUtil {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonLockUtil.class.getName());
+
+  /**
+   * unlocks given file
+   *
+   * @param carbonLock
+   */
+  public static void fileUnlock(ICarbonLock carbonLock, String locktype) {
+    if (carbonLock.unlock()) {
+      if (locktype.equals(LockUsage.METADATA_LOCK)) {
+        LOGGER.info("Metadata lock has been successfully released");
+      } else if (locktype.equals(LockUsage.TABLE_STATUS_LOCK)) {
+        LOGGER.info("Table status lock has been successfully released");
+      }
+      else if (locktype.equals(LockUsage.CLEAN_FILES_LOCK)) {
+        LOGGER.info("Clean files lock has been successfully released");
+      }
+      else if (locktype.equals(LockUsage.DELETE_SEGMENT_LOCK)) {
+        LOGGER.info("Delete segments lock has been successfully released");
+      }
+    } else {
+      if (locktype.equals(LockUsage.METADATA_LOCK)) {
+        LOGGER.error("Not able to release the metadata lock");
+      } else if (locktype.equals(LockUsage.TABLE_STATUS_LOCK)) {
+        LOGGER.error("Not able to release the table status lock");
+      }
+      else if (locktype.equals(LockUsage.CLEAN_FILES_LOCK)) {
+        LOGGER.info("Not able to release the clean files lock");
+      }
+      else if (locktype.equals(LockUsage.DELETE_SEGMENT_LOCK)) {
+        LOGGER.info("Not able to release the delete segments lock");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java 
b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
new file mode 100644
index 0000000..035f48f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.locks;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * This class is used to handle the HDFS File locking.
+ * This is acheived using the concept of acquiring the data out stream using 
Append option.
+ */
+public class HdfsFileLock extends AbstractCarbonLock {
+
+  private static final LogService LOGGER =
+             LogServiceFactory.getLogService(HdfsFileLock.class.getName());
+  /**
+   * location hdfs file location
+   */
+  private String location;
+
+  private DataOutputStream dataOutputStream;
+
+  public static String tmpPath;
+
+  static {
+    Configuration conf = new Configuration(true);
+    String hdfsPath = conf.get(CarbonCommonConstants.FS_DEFAULT_FS);
+    // By default, we put the hdfs lock meta file for one table inside this 
table's store folder.
+    // If can not get the STORE_LOCATION, then use hadoop.tmp.dir .
+    tmpPath = 
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION,
+               System.getProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION));
+    if (!tmpPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
+      tmpPath = hdfsPath + tmpPath;
+    }
+  }
+
+  /**
+   * @param lockFileLocation
+   * @param lockFile
+   */
+  public HdfsFileLock(String lockFileLocation, String lockFile) {
+    this.location = tmpPath + CarbonCommonConstants.FILE_SEPARATOR + 
lockFileLocation
+        + CarbonCommonConstants.FILE_SEPARATOR + lockFile;
+    LOGGER.info("HDFS lock path:"+this.location);
+    initRetry();
+  }
+
+  /**
+   * @param tableIdentifier
+   * @param lockFile
+   */
+  public HdfsFileLock(CarbonTableIdentifier tableIdentifier, String lockFile) {
+    this(tableIdentifier.getDatabaseName() + 
CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier
+        .getTableName(), lockFile);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.carbondata.core.locks.ICarbonLock#lock()
+   */
+  @Override public boolean lock() {
+    try {
+      if (!FileFactory.isFileExist(location, 
FileFactory.getFileType(location))) {
+        FileFactory.createNewLockFile(location, 
FileFactory.getFileType(location));
+      }
+      dataOutputStream =
+          FileFactory.getDataOutputStreamUsingAppend(location, 
FileFactory.getFileType(location));
+
+      return true;
+
+    } catch (IOException e) {
+      return false;
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.carbondata.core.locks.ICarbonLock#unlock()
+   */
+  @Override
+  public boolean unlock() {
+    if (null != dataOutputStream) {
+      try {
+        dataOutputStream.close();
+      } catch (IOException e) {
+        return false;
+      } finally {
+        CarbonFile carbonFile =
+            FileFactory.getCarbonFile(location, 
FileFactory.getFileType(location));
+        if (carbonFile.exists()) {
+          if (carbonFile.delete()) {
+            LOGGER.info("Deleted the lock file " + location);
+          } else {
+            LOGGER.error("Not able to delete the lock file " + location);
+          }
+        } else {
+          LOGGER.error("Not able to delete the lock file because "
+              + "it is not existed in location " + location);
+        }
+      }
+    }
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java 
b/core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java
new file mode 100644
index 0000000..d984821
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.locks;
+
+/**
+ * Carbon Lock Interface which handles the locking and unlocking.
+ */
+public interface ICarbonLock {
+
+  /**
+   * Does the unlocking of the acquired lock.
+   *
+   * @return
+   */
+  boolean unlock();
+
+  /**
+   * This will acquire the lock and if it doesnt get then it will retry after 
the confiured time.
+   *
+   * @return
+   */
+  boolean lockWithRetries();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java 
b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
new file mode 100644
index 0000000..b8b4eed
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.locks;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.OverlappingFileLockException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+
+/**
+ * This class handles the file locking in the local file system.
+ * This will be handled using the file channel lock API.
+ */
+public class LocalFileLock extends AbstractCarbonLock {
+  /**
+   * location is the location of the lock file.
+   */
+  private String location;
+
+  /**
+   * fileOutputStream of the local lock file
+   */
+  private FileOutputStream fileOutputStream;
+
+  /**
+   * channel is the FileChannel of the lock file.
+   */
+  private FileChannel channel;
+
+  /**
+   * fileLock NIO FileLock Object
+   */
+  private FileLock fileLock;
+
+  /**
+   * lock file
+   */
+  private String lockFile;
+
+  public static final String tmpPath;
+
+  private  String lockFilePath;
+
+  /**
+   * LOGGER for  logging the messages.
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(LocalFileLock.class.getName());
+
+  static {
+    tmpPath = System.getProperty("java.io.tmpdir");
+  }
+
+  /**
+   * @param lockFileLocation
+   * @param lockFile
+   */
+  public LocalFileLock(String lockFileLocation, String lockFile) {
+    this.location = tmpPath + CarbonCommonConstants.FILE_SEPARATOR + 
lockFileLocation;
+    this.lockFile = lockFile;
+    initRetry();
+  }
+
+  /**
+   * @param tableIdentifier
+   * @param lockFile
+   */
+  public LocalFileLock(CarbonTableIdentifier tableIdentifier, String lockFile) 
{
+    this(tableIdentifier.getDatabaseName() + 
CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier
+        .getTableName(), lockFile);
+    initRetry();
+  }
+
+  /**
+   * Lock API for locking of the file channel of the lock file.
+   *
+   * @return
+   */
+  @Override public boolean lock() {
+    try {
+      if (!FileFactory.isFileExist(location, 
FileFactory.getFileType(tmpPath))) {
+        FileFactory.mkdirs(location, FileFactory.getFileType(tmpPath));
+      }
+      lockFilePath = location + CarbonCommonConstants.FILE_SEPARATOR +
+          lockFile;
+      if (!FileFactory.isFileExist(lockFilePath, 
FileFactory.getFileType(location))) {
+        FileFactory.createNewLockFile(lockFilePath, 
FileFactory.getFileType(location));
+      }
+
+      fileOutputStream = new FileOutputStream(lockFilePath);
+      channel = fileOutputStream.getChannel();
+      try {
+        fileLock = channel.tryLock();
+      } catch (OverlappingFileLockException e) {
+        return false;
+      }
+      if (null != fileLock) {
+        return true;
+      } else {
+        return false;
+      }
+    } catch (IOException e) {
+      return false;
+    }
+
+  }
+
+  /**
+   * Unlock API for unlocking of the acquired lock.
+   *
+   * @return
+   */
+  @Override public boolean unlock() {
+    boolean status;
+    try {
+      if (null != fileLock) {
+        fileLock.release();
+      }
+      status = true;
+    } catch (IOException e) {
+      status = false;
+    } finally {
+      if (null != fileOutputStream) {
+        try {
+          fileOutputStream.close();
+          // deleting the lock file after releasing the lock.
+          if (FileFactory.getCarbonFile(lockFilePath, 
FileFactory.getFileType(lockFilePath))
+              .delete()) {
+            LOGGER.info("Successfully deleted the lock file " + lockFilePath);
+          } else {
+            LOGGER.error("Not able to delete the lock file " + lockFilePath);
+          }
+        } catch (IOException e) {
+          LOGGER.error(e.getMessage());
+        }
+      }
+    }
+    return status;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java 
b/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
new file mode 100644
index 0000000..1c6ef8e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.locks;
+
+/**
+ * This enum is used to define the usecase of the lock.
+ * Each enum value is one specific lock case.
+ */
+public class LockUsage {
+  public static final String LOCK = ".lock";
+  public static final String METADATA_LOCK = "meta.lock";
+  public static final String COMPACTION_LOCK = "compaction.lock";
+  public static final String SYSTEMLEVEL_COMPACTION_LOCK = 
"system_level_compaction.lock";
+  public static final String TABLE_STATUS_LOCK = "tablestatus.lock";
+  public static final String TABLE_UPDATE_STATUS_LOCK = 
"tableupdatestatus.lock";
+  public static final String DELETE_SEGMENT_LOCK = "delete_segment.lock";
+  public static final String CLEAN_FILES_LOCK = "clean_files.lock";
+  public static final String DROP_TABLE_LOCK = "droptable.lock";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java 
b/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java
new file mode 100644
index 0000000..5296069
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.locks;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * For Handling the zookeeper locking implementation
+ */
+public class ZooKeeperLocking extends AbstractCarbonLock {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(ZooKeeperLocking.class.getName());
+
+  /**
+   * zk is the zookeeper client instance
+   */
+  private static ZooKeeper zk;
+
+  /**
+   * zooKeeperLocation is the location in the zoo keeper file system where the 
locks will be
+   * maintained.
+   */
+  private static final String zooKeeperLocation = 
CarbonCommonConstants.ZOOKEEPER_LOCATION;
+
+  /**
+   * Unique folder for each table with DatabaseName_TableName
+   */
+  private final String tableIdFolder;
+
+  /**
+   * lockName is the name of the lock to use. This name should be same for 
every process that want
+   * to share the same lock
+   */
+  private String lockName;
+
+  /**
+   * lockPath is the unique path created for the each instance of the carbon 
lock.
+   */
+  private String lockPath;
+
+  private String lockTypeFolder;
+
+  public ZooKeeperLocking(CarbonTableIdentifier tableIdentifier, String 
lockFile) {
+    this(tableIdentifier.getDatabaseName() + File.separator + 
tableIdentifier.getTableName(),
+        lockFile);
+  }
+
+  /**
+   * @param lockLocation
+   * @param lockFile
+   */
+  public ZooKeeperLocking(String lockLocation, String lockFile) {
+    this.lockName = lockFile;
+    this.tableIdFolder = zooKeeperLocation + 
CarbonCommonConstants.FILE_SEPARATOR + lockLocation;
+
+    String zooKeeperUrl =
+        
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ZOOKEEPER_URL);
+    zk = ZookeeperInit.getInstance(zooKeeperUrl).getZookeeper();
+
+    this.lockTypeFolder = zooKeeperLocation + 
CarbonCommonConstants.FILE_SEPARATOR + lockLocation
+        + CarbonCommonConstants.FILE_SEPARATOR + lockFile;
+    try {
+      createBaseNode();
+      // if exists returns null then path doesnt exist. so creating.
+      if (null == zk.exists(this.tableIdFolder, true)) {
+        createRecursivly(this.tableIdFolder);
+      }
+      // if exists returns null then path doesnt exist. so creating.
+      if (null == zk.exists(this.lockTypeFolder, true)) {
+        zk.create(this.lockTypeFolder, new byte[1], Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+      }
+    } catch (KeeperException | InterruptedException e) {
+      LOGGER.error(e, e.getMessage());
+    }
+    initRetry();
+  }
+
+  /**
+   * Creating a znode in which all the znodes (lock files )are maintained.
+   */
+  private void createBaseNode() throws KeeperException, InterruptedException {
+    if (null == zk.exists(zooKeeperLocation, true)) {
+      // creating a znode in which all the znodes (lock files )are maintained.
+      zk.create(zooKeeperLocation, new byte[1], Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+    }
+  }
+
+  /**
+   * Create zookeepr node if not exist
+   * @param path
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  private void createRecursivly(String path) throws KeeperException, 
InterruptedException {
+    try {
+      if (zk.exists(path, true) == null && path.length() > 0) {
+        String temp = path.substring(0, path.lastIndexOf(File.separator));
+        createRecursivly(temp);
+        zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+      } else {
+        return;
+      }
+    } catch (KeeperException e) {
+      throw e;
+    } catch (InterruptedException e) {
+      throw e;
+    }
+
+  }
+  /**
+   * Handling of the locking mechanism using zoo keeper.
+   */
+  @Override public boolean lock() {
+    try {
+      // create the lock file with lockName.
+      lockPath =
+          zk.create(this.lockTypeFolder + CarbonCommonConstants.FILE_SEPARATOR 
+ lockName, null,
+              Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+
+      // get the children present in zooKeeperLocation.
+      List<String> nodes = zk.getChildren(this.lockTypeFolder, null);
+
+      // sort the childrens
+      Collections.sort(nodes);
+
+      // here the logic is , for each lock request zookeeper will create a 
file ending with
+      // incremental digits.
+      // so first request will be 00001 next is 00002 and so on.
+      // if the current request is 00002 and already one previous 
request(00001) is present then get
+      // children will give both nodes.
+      // after the sort we are checking if the lock path is first or not .if 
it is first then lock
+      // has been acquired.
+
+      if (lockPath.endsWith(nodes.get(0))) {
+        return true;
+      } else {
+        // if locking failed then deleting the created lock as next time again 
new lock file will be
+        // created.
+        zk.delete(lockPath, -1);
+        return false;
+      }
+    } catch (KeeperException | InterruptedException e) {
+      LOGGER.error(e, e.getMessage());
+      return false;
+    }
+  }
+
+  /**
+   * @return status where lock file is unlocked or not.
+   */
+  @Override public boolean unlock() {
+    try {
+      // exists will return null if the path doesn't exists.
+      if (null != zk.exists(lockPath, true)) {
+        zk.delete(lockPath, -1);
+        lockPath = null;
+      }
+    } catch (KeeperException | InterruptedException e) {
+      LOGGER.error(e, e.getMessage());
+      return false;
+    }
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/locks/ZookeeperInit.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/locks/ZookeeperInit.java 
b/core/src/main/java/org/apache/carbondata/core/locks/ZookeeperInit.java
new file mode 100644
index 0000000..3b6e0b8
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/locks/ZookeeperInit.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.locks;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * This is a singleton class for initialization of zookeeper client.
+ */
+public class ZookeeperInit {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(ZookeeperInit.class.getName());
+
+  private static ZookeeperInit zooKeeperInit;
+  /**
+   * zk is the zookeeper client instance
+   */
+  private ZooKeeper zk;
+
+  private ZookeeperInit(String zooKeeperUrl) {
+
+    int sessionTimeOut = 100000;
+    try {
+      zk = new ZooKeeper(zooKeeperUrl, sessionTimeOut, new DummyWatcher());
+
+    } catch (IOException e) {
+      LOGGER.error(e.getMessage());
+    }
+
+  }
+
+  public static ZookeeperInit getInstance(String zooKeeperUrl) {
+
+    if (null == zooKeeperInit) {
+      synchronized (ZookeeperInit.class) {
+        if (null == zooKeeperInit) {
+          LOGGER.info("Initiating Zookeeper client.");
+          zooKeeperInit = new ZookeeperInit(zooKeeperUrl);
+        }
+      }
+    }
+    return zooKeeperInit;
+
+  }
+
+  public static ZookeeperInit getInstance() {
+    return zooKeeperInit;
+  }
+
+  public ZooKeeper getZookeeper() {
+    return zk;
+  }
+
+  private static class DummyWatcher implements Watcher {
+    public void process(WatchedEvent event) {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/memory/CarbonUnsafe.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/memory/CarbonUnsafe.java 
b/core/src/main/java/org/apache/carbondata/core/memory/CarbonUnsafe.java
new file mode 100644
index 0000000..ea774d5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/memory/CarbonUnsafe.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.memory;
+
+import java.lang.reflect.Field;
+
+import sun.misc.Unsafe;
+
+
+public final class CarbonUnsafe {
+
+  public static final int BYTE_ARRAY_OFFSET;
+
+  public static final int SHORT_ARRAY_OFFSET;
+
+  public static final int INT_ARRAY_OFFSET;
+
+  public static final int LONG_ARRAY_OFFSET;
+
+  public static final int DOUBLE_ARRAY_OFFSET;
+
+  public static Unsafe unsafe;
+
+  static {
+    try {
+      Field cause = Unsafe.class.getDeclaredField("theUnsafe");
+      cause.setAccessible(true);
+      unsafe = (Unsafe) cause.get(null);
+    } catch (Throwable var2) {
+      unsafe = null;
+    }
+    if (unsafe != null) {
+      BYTE_ARRAY_OFFSET = unsafe.arrayBaseOffset(byte[].class);
+      SHORT_ARRAY_OFFSET = unsafe.arrayBaseOffset(short[].class);
+      INT_ARRAY_OFFSET = unsafe.arrayBaseOffset(int[].class);
+      LONG_ARRAY_OFFSET = unsafe.arrayBaseOffset(long[].class);
+      DOUBLE_ARRAY_OFFSET = unsafe.arrayBaseOffset(double[].class);
+    } else {
+      BYTE_ARRAY_OFFSET = 0;
+      SHORT_ARRAY_OFFSET = 0;
+      INT_ARRAY_OFFSET = 0;
+      LONG_ARRAY_OFFSET = 0;
+      DOUBLE_ARRAY_OFFSET = 0;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java 
b/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
index c5a6087..2b63726 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java
@@ -23,8 +23,6 @@ import java.util.LinkedList;
 import java.util.Map;
 import javax.annotation.concurrent.GuardedBy;
 
-import org.apache.carbondata.core.unsafe.CarbonUnsafe;
-
 /**
  * Code ported from Apache Spark {org.apache.spark.unsafe.memory} package
  * A simple {@link MemoryAllocator} that can allocate up to 16GB using a JVM 
long primitive array.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java
 
b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java
index ae4cc0a..b81a7ba 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java
@@ -17,8 +17,6 @@
 
 package org.apache.carbondata.core.memory;
 
-import org.apache.carbondata.core.unsafe.CarbonUnsafe;
-
 /**
  * Code ported from Apache Spark {org.apache.spark.unsafe.memory} package
  * A simple {@link MemoryAllocator} that uses {@code Unsafe} to allocate 
off-heap memory.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
new file mode 100644
index 0000000..908fbc2
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.File;
+import java.io.Serializable;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+/**
+ * identifier which will have store path and carbon table identifier
+ */
+public class AbsoluteTableIdentifier implements Serializable {
+
+  /**
+   * serializable version
+   */
+  private static final long serialVersionUID = 4695047103484427506L;
+
+  /**
+   * path of the store
+   */
+  private String storePath;
+
+  /**
+   * carbon table identifier which will have table name and table database
+   * name
+   */
+  private CarbonTableIdentifier carbonTableIdentifier;
+
+  public AbsoluteTableIdentifier(String storePath, CarbonTableIdentifier 
carbonTableIdentifier) {
+    //TODO this should be moved to common place where path handling will be 
handled
+    this.storePath = FileFactory.getUpdatedFilePath(storePath);
+    this.carbonTableIdentifier = carbonTableIdentifier;
+  }
+
+  /**
+   * @return the storePath
+   */
+  public String getStorePath() {
+    return storePath;
+  }
+
+  /**
+   * @return the carbonTableIdentifier
+   */
+  public CarbonTableIdentifier getCarbonTableIdentifier() {
+    return carbonTableIdentifier;
+  }
+
+  public static AbsoluteTableIdentifier from(String dbName, String tableName) {
+    CarbonTableIdentifier identifier = new CarbonTableIdentifier(dbName, 
tableName, "");
+    return new AbsoluteTableIdentifier(CarbonUtil.getCarbonStorePath(), 
identifier);
+  }
+
+  public static AbsoluteTableIdentifier fromTablePath(String tablePath) {
+    String formattedTablePath = tablePath.replace('\\', '/');
+    String[] names = formattedTablePath.split("/");
+    if (names.length < 3) {
+      throw new IllegalArgumentException("invalid table path: " + tablePath);
+    }
+
+    String tableName = names[names.length - 1];
+    String dbName = names[names.length - 2];
+    String storePath = formattedTablePath.substring(0, 
formattedTablePath.lastIndexOf(dbName +
+            CarbonCommonConstants.FILE_SEPARATOR + tableName));
+
+    CarbonTableIdentifier identifier =
+        new CarbonTableIdentifier(dbName, tableName, 
Long.toString(System.currentTimeMillis()));
+    return new AbsoluteTableIdentifier(storePath, identifier);
+  }
+
+  public String getTablePath() {
+    return getStorePath() + File.separator + 
getCarbonTableIdentifier().getDatabaseName() +
+        File.separator + getCarbonTableIdentifier().getTableName();
+  }
+
+  /**
+   * to get the hash code
+   */
+  @Override public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result =
+        prime * result + ((carbonTableIdentifier == null) ? 0 : 
carbonTableIdentifier.hashCode());
+    result = prime * result + ((storePath == null) ? 0 : storePath.hashCode());
+    return result;
+  }
+
+  /**
+   * to check this class is equal to
+   * other object passed
+   *
+   * @param obj other object
+   */
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof AbsoluteTableIdentifier)) {
+      return false;
+    }
+    AbsoluteTableIdentifier other = (AbsoluteTableIdentifier) obj;
+    if (carbonTableIdentifier == null) {
+      if (other.carbonTableIdentifier != null) {
+        return false;
+      }
+    } else if (!carbonTableIdentifier.equals(other.carbonTableIdentifier)) {
+      return false;
+    }
+    if (storePath == null) {
+      if (other.storePath != null) {
+        return false;
+      }
+    } else if (!storePath.equals(other.storePath)) {
+      return false;
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/metadata/BlockletInfoColumnar.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/BlockletInfoColumnar.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/BlockletInfoColumnar.java
index c42bc93..46ef5ec 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/BlockletInfoColumnar.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/BlockletInfoColumnar.java
@@ -21,7 +21,7 @@ package org.apache.carbondata.core.metadata;
 
 import java.util.BitSet;
 
-import 
org.apache.carbondata.core.datastorage.store.compression.WriterCompressModel;
+import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
 
 public class BlockletInfoColumnar {
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
new file mode 100644
index 0000000..2f23669
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+
+/**
+ * Class which persist the information about the tables present the carbon 
schemas
+ */
+public final class CarbonMetadata {
+
+  /**
+   * meta data instance
+   */
+  private static final CarbonMetadata CARBONMETADATAINSTANCE = new 
CarbonMetadata();
+
+  /**
+   * holds the list of tableInfo currently present
+   */
+  private Map<String, CarbonTable> tableInfoMap;
+
+  private CarbonMetadata() {
+    // creating a concurrent map as it will be updated by multiple thread
+    tableInfoMap = new ConcurrentHashMap<String, CarbonTable>();
+  }
+
+  public static CarbonMetadata getInstance() {
+    return CARBONMETADATAINSTANCE;
+  }
+
+  /**
+   * removed the table information
+   *
+   * @param tableUniquName
+   */
+  public void removeTable(String tableUniquName) {
+    tableInfoMap.remove(convertToLowerCase(tableUniquName));
+  }
+
+  /**
+   * Below method will be used to set the carbon table
+   * This method will be used in executor side as driver will always have
+   * updated table so from driver during query execution and data loading
+   * we just need to add the table
+   *
+   * @param carbonTable
+   */
+  public void addCarbonTable(CarbonTable carbonTable) {
+    tableInfoMap.put(convertToLowerCase(carbonTable.getTableUniqueName()), 
carbonTable);
+  }
+
+  /**
+   * method load the table
+   *
+   * @param tableInfo
+   */
+  public void loadTableMetadata(TableInfo tableInfo) {
+    CarbonTable carbonTable = 
tableInfoMap.get(convertToLowerCase(tableInfo.getTableUniqueName()));
+    if (null == carbonTable || carbonTable.getTableLastUpdatedTime() < 
tableInfo
+        .getLastUpdatedTime()) {
+      carbonTable = new CarbonTable();
+      carbonTable.loadCarbonTable(tableInfo);
+      tableInfoMap.put(convertToLowerCase(tableInfo.getTableUniqueName()), 
carbonTable);
+    }
+  }
+
+  /**
+   * Below method to get the loaded carbon table
+   *
+   * @param tableUniqueName
+   * @return
+   */
+  public CarbonTable getCarbonTable(String tableUniqueName) {
+    return tableInfoMap.get(convertToLowerCase(tableUniqueName));
+  }
+
+  /**
+   * @return the number of tables present in the schema
+   */
+  public int getNumberOfTables() {
+    return tableInfoMap.size();
+  }
+
+  /**
+   * returns the given string in lowercase
+   * @param table
+   * @return
+   */
+  public String convertToLowerCase(String table) {
+    return table.toLowerCase();
+  }
+
+  /**
+   * method will return dimension instance based on the column identifier
+   * and table instance passed to it.
+   *
+   * @param carbonTable
+   * @param columnIdentifier
+   * @return CarbonDimension instance
+   */
+  public CarbonDimension getCarbonDimensionBasedOnColIdentifier(CarbonTable 
carbonTable,
+      String columnIdentifier) {
+    List<CarbonDimension> listOfCarbonDims =
+        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+    for (CarbonDimension dimension : listOfCarbonDims) {
+      if (dimension.getColumnId().equals(columnIdentifier)) {
+        return dimension;
+      }
+      if (dimension.numberOfChild() > 0) {
+        CarbonDimension childDim =
+            getCarbonChildDimsBasedOnColIdentifier(columnIdentifier, 
dimension);
+        if (null != childDim) {
+          return childDim;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Below method will be used to get the dimension based on column identifier
+   * for complex dimension children
+   *
+   * @param columnIdentifier column identifier
+   * @param dimension        parent dimension
+   * @return children dimension
+   */
+  private CarbonDimension getCarbonChildDimsBasedOnColIdentifier(String 
columnIdentifier,
+      CarbonDimension dimension) {
+    for (int i = 0; i < dimension.numberOfChild(); i++) {
+      if 
(dimension.getListOfChildDimensions().get(i).getColumnId().equals(columnIdentifier))
 {
+        return dimension.getListOfChildDimensions().get(i);
+      } else if (dimension.getListOfChildDimensions().get(i).numberOfChild() > 
0) {
+        CarbonDimension childDim = 
getCarbonChildDimsBasedOnColIdentifier(columnIdentifier,
+            dimension.getListOfChildDimensions().get(i));
+        if (null != childDim) {
+          return childDim;
+        }
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
new file mode 100644
index 0000000..7d2fb18
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.metadata;
+
+import java.io.File;
+import java.io.Serializable;
+
+/**
+ * Identifier class which will hold the table qualified name
+ */
+public class CarbonTableIdentifier implements Serializable {
+
+  /**
+   * database name
+   */
+  private String databaseName;
+
+  /**
+   * table name
+   */
+  private String tableName;
+
+  /**
+   * table id
+   */
+  private String tableId;
+
+  /**
+   * constructor
+   */
+  public CarbonTableIdentifier(String databaseName, String tableName, String 
tableId) {
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+    this.tableId = tableId;
+  }
+
+  /**
+   * return database name
+   */
+  public String getDatabaseName() {
+    return databaseName;
+  }
+
+  /**
+   * return table name
+   */
+  public String getTableName() {
+    return tableName;
+  }
+
+  /**
+   * @return tableId
+   */
+  public String getTableId() {
+    return tableId;
+  }
+
+  /**
+   * @return table unique name
+   */
+  public String getTableUniqueName() {
+    return databaseName + '_' + tableName;
+  }
+
+  /**
+   *Creates the key for bad record lgger.
+   */
+  public String getBadRecordLoggerKey() {
+    return databaseName + File.separator + tableName + '_' + tableName;
+  }
+
+  @Override public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((databaseName == null) ? 0 : 
databaseName.hashCode());
+    result = prime * result + ((tableId == null) ? 0 : tableId.hashCode());
+    result = prime * result + ((tableName == null) ? 0 : tableName.hashCode());
+    return result;
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    CarbonTableIdentifier other = (CarbonTableIdentifier) obj;
+    if (databaseName == null) {
+      if (other.databaseName != null) {
+        return false;
+      }
+    } else if (!databaseName.equals(other.databaseName)) {
+      return false;
+    }
+    if (tableId == null) {
+      if (other.tableId != null) {
+        return false;
+      }
+    } else if (!tableId.equals(other.tableId)) {
+      return false;
+    }
+    if (tableName == null) {
+      if (other.tableName != null) {
+        return false;
+      }
+    } else if (!tableName.equals(other.tableName)) {
+      return false;
+    }
+    return true;
+  }
+
+  /*
+ * @return table unidque name
+ */
+  @Override public String toString() {
+    return databaseName + '_' + tableName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/metadata/ColumnIdentifier.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/ColumnIdentifier.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/ColumnIdentifier.java
new file mode 100644
index 0000000..7ddc5a4
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/ColumnIdentifier.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Column unique identifier
+ */
+public class ColumnIdentifier implements Serializable {
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * column id
+   */
+  private String columnId;
+
+  /**
+   * column properties
+   */
+  private Map<String, String> columnProperties;
+
+  private DataType dataType;
+
+  /**
+   * @param columnId
+   * @param columnProperties
+   */
+  public ColumnIdentifier(String columnId, Map<String, String> 
columnProperties,
+      DataType dataType) {
+    this.columnId = columnId;
+    this.columnProperties = columnProperties;
+    this.dataType = dataType;
+  }
+
+  /**
+   * @return columnId
+   */
+  public String getColumnId() {
+    return columnId;
+  }
+
+  /**
+   * @param columnProperty
+   * @return
+   */
+  public String getColumnProperty(String columnProperty) {
+    String property = null;
+    if (null != columnProperties) {
+      property = columnProperties.get(columnProperty);
+    }
+    return property;
+  }
+
+  public DataType getDataType() {
+    return this.dataType;
+  }
+
+  @Override public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((columnId == null) ? 0 : columnId.hashCode());
+    return result;
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    ColumnIdentifier other = (ColumnIdentifier) obj;
+    if (columnId == null) {
+      if (other.columnId != null) {
+        return false;
+      }
+    } else if (!columnId.equals(other.columnId)) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override public String toString() {
+    return "ColumnIdentifier [columnId=" + columnId + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/metadata/ColumnarFormatVersion.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/ColumnarFormatVersion.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/ColumnarFormatVersion.java
new file mode 100644
index 0000000..fb15cc5
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/ColumnarFormatVersion.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.metadata;
+
+public enum ColumnarFormatVersion {
+  V1((short)1),
+  V2((short)2);
+
+  private short version;
+  ColumnarFormatVersion(short version) {
+    this.version = version;
+  }
+
+  @Override
+  public String toString() {
+    return "ColumnarFormatV" + version;
+  }
+
+  public short number() {
+    return version;
+  }
+
+  public static ColumnarFormatVersion valueOf(short version) {
+    switch (version) {
+      case 0:
+        // before multiple reader support, for existing carbon file, it is 
version 1
+        return V1;
+      case 1:
+        // after multiple reader support, user can write new file with version 
1
+        return V1;
+      default:
+        return V2;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
new file mode 100644
index 0000000..b7c477f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.metadata.blocklet;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.carbondata.core.metadata.blocklet.datachunk.DataChunk;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
+
+/**
+ * class to store the information about the blocklet
+ */
+public class BlockletInfo implements Serializable {
+
+  /**
+   * serialization id
+   */
+  private static final long serialVersionUID = 1873135459695635381L;
+
+  /**
+   * Number of rows in this blocklet
+   */
+  private int numberOfRows;
+
+  /**
+   * Information about dimension chunk of all dimensions in this blocklet
+   */
+  private List<DataChunk> dimensionColumnChunk;
+
+  /**
+   * Information about measure chunk of all measures in this blocklet
+   */
+  private List<DataChunk> measureColumnChunk;
+
+  private List<Long> dimensionChunkOffsets;
+
+  private List<Short> dimensionChunksLength;
+
+  private List<Long> measureChunkOffsets;
+
+  private List<Short> measureChunksLength;
+
+  /**
+   * to store the index like min max and start and end key of each column of 
the blocklet
+   */
+  private BlockletIndex blockletIndex;
+
+  /**
+   * @return the numberOfRows
+   */
+  public int getNumberOfRows() {
+    return numberOfRows;
+  }
+
+  /**
+   * @param numberOfRows the numberOfRows to set
+   */
+  public void setNumberOfRows(int numberOfRows) {
+    this.numberOfRows = numberOfRows;
+  }
+
+  /**
+   * @return the dimensionColumnChunk
+   */
+  public List<DataChunk> getDimensionColumnChunk() {
+    return dimensionColumnChunk;
+  }
+
+  /**
+   * @param dimensionColumnChunk the dimensionColumnChunk to set
+   */
+  public void setDimensionColumnChunk(List<DataChunk> dimensionColumnChunk) {
+    this.dimensionColumnChunk = dimensionColumnChunk;
+  }
+
+  /**
+   * @return the measureColumnChunk
+   */
+  public List<DataChunk> getMeasureColumnChunk() {
+    return measureColumnChunk;
+  }
+
+  /**
+   * @param measureColumnChunk the measureColumnChunk to set
+   */
+  public void setMeasureColumnChunk(List<DataChunk> measureColumnChunk) {
+    this.measureColumnChunk = measureColumnChunk;
+  }
+
+  /**
+   * @return the blockletIndex
+   */
+  public BlockletIndex getBlockletIndex() {
+    return blockletIndex;
+  }
+
+  /**
+   * @param blockletIndex the blockletIndex to set
+   */
+  public void setBlockletIndex(BlockletIndex blockletIndex) {
+    this.blockletIndex = blockletIndex;
+  }
+
+  public List<Long> getDimensionChunkOffsets() {
+    return dimensionChunkOffsets;
+  }
+
+  public void setDimensionChunkOffsets(List<Long> dimensionChunkOffsets) {
+    this.dimensionChunkOffsets = dimensionChunkOffsets;
+  }
+
+  public List<Short> getDimensionChunksLength() {
+    return dimensionChunksLength;
+  }
+
+  public void setDimensionChunksLength(List<Short> dimensionChunksLength) {
+    this.dimensionChunksLength = dimensionChunksLength;
+  }
+
+  public List<Long> getMeasureChunkOffsets() {
+    return measureChunkOffsets;
+  }
+
+  public void setMeasureChunkOffsets(List<Long> measureChunkOffsets) {
+    this.measureChunkOffsets = measureChunkOffsets;
+  }
+
+  public List<Short> getMeasureChunksLength() {
+    return measureChunksLength;
+  }
+
+  public void setMeasureChunksLength(List<Short> measureChunksLength) {
+    this.measureChunksLength = measureChunksLength;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/DataFileFooter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/DataFileFooter.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/DataFileFooter.java
new file mode 100644
index 0000000..7b9d1e2
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/DataFileFooter.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.metadata.blocklet;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.block.BlockInfo;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+
+/**
+ * Information of one data file
+ */
+public class DataFileFooter implements Serializable {
+
+  /**
+   * serialization id
+   */
+  private static final long serialVersionUID = -7284319972734500751L;
+
+  /**
+   * version used for data compatibility
+   */
+  private ColumnarFormatVersion versionId;
+
+  /**
+   * total number of rows in this file
+   */
+  private long numberOfRows;
+
+  /**
+   * Segment info (will be same/repeated for all block in this segment)
+   */
+  private SegmentInfo segmentInfo;
+
+  /**
+   * Information about leaf nodes of all columns in this file
+   */
+  private List<BlockletInfo> blockletList;
+
+  /**
+   * blocklet index of all blocklets in this file
+   */
+  private BlockletIndex blockletIndex;
+
+  /**
+   * Description of columns in this file
+   */
+  private List<ColumnSchema> columnInTable;
+
+  /**
+   * to store the block info detail like file name block index and locations
+   */
+  private BlockInfo blockInfo;
+
+  /**
+   * @return the versionId
+   */
+  public ColumnarFormatVersion getVersionId() {
+    return versionId;
+  }
+
+  /**
+   * @param versionId the versionId to set
+   */
+  public void setVersionId(ColumnarFormatVersion versionId) {
+    this.versionId = versionId;
+  }
+
+  /**
+   * @return the numberOfRows
+   */
+  public long getNumberOfRows() {
+    return numberOfRows;
+  }
+
+  /**
+   * @param numberOfRows the numberOfRows to set
+   */
+  public void setNumberOfRows(long numberOfRows) {
+    this.numberOfRows = numberOfRows;
+  }
+
+  /**
+   * @return the segmentInfo
+   */
+  public SegmentInfo getSegmentInfo() {
+    return segmentInfo;
+  }
+
+  /**
+   * @param segmentInfo the segmentInfo to set
+   */
+  public void setSegmentInfo(SegmentInfo segmentInfo) {
+    this.segmentInfo = segmentInfo;
+  }
+
+  /**
+   * @return the List of Blocklet
+   */
+  public List<BlockletInfo> getBlockletList() {
+    return blockletList;
+  }
+
+  /**
+   * @param blockletList the blockletList to set
+   */
+  public void setBlockletList(List<BlockletInfo> blockletList) {
+    this.blockletList = blockletList;
+  }
+
+  /**
+   * @return the blockletIndex
+   */
+  public BlockletIndex getBlockletIndex() {
+    return blockletIndex;
+  }
+
+  /**
+   * @param blockletIndex the blockletIndex to set
+   */
+  public void setBlockletIndex(BlockletIndex blockletIndex) {
+    this.blockletIndex = blockletIndex;
+  }
+
+  /**
+   * @return the columnInTable
+   */
+  public List<ColumnSchema> getColumnInTable() {
+    return columnInTable;
+  }
+
+  /**
+   * @param columnInTable the columnInTable to set
+   */
+  public void setColumnInTable(List<ColumnSchema> columnInTable) {
+    this.columnInTable = columnInTable;
+  }
+
+  /**
+   * @return the tableBlockInfo
+   */
+  public BlockInfo getBlockInfo() {
+    return blockInfo;
+  }
+
+  /**
+   * @param tableBlockInfo the tableBlockInfo to set
+   */
+  public void setBlockInfo(BlockInfo tableBlockInfo) {
+    this.blockInfo = tableBlockInfo;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/SegmentInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/SegmentInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/SegmentInfo.java
new file mode 100644
index 0000000..a98f110
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/SegmentInfo.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.metadata.blocklet;
+
+import java.io.Serializable;
+
+/**
+ * Class holds the information about the segment information
+ */
+public class SegmentInfo implements Serializable {
+
+  /**
+   * serialization version
+   */
+  private static final long serialVersionUID = -1749874611112709431L;
+
+  /**
+   * number of column in the segment
+   */
+  private int numberOfColumns;
+
+  /**
+   * cardinality of each columns
+   * column which is not participating in the multidimensional key cardinality 
will be -1;
+   */
+  private int[] columnCardinality;
+
+  /**
+   * @return the numberOfColumns
+   */
+  public int getNumberOfColumns() {
+    return numberOfColumns;
+  }
+
+  /**
+   * @param numberOfColumns the numberOfColumns to set
+   */
+  public void setNumberOfColumns(int numberOfColumns) {
+    this.numberOfColumns = numberOfColumns;
+  }
+
+  /**
+   * @return the columnCardinality
+   */
+  public int[] getColumnCardinality() {
+    return columnCardinality;
+  }
+
+  /**
+   * @param columnCardinality the columnCardinality to set
+   */
+  public void setColumnCardinality(int[] columnCardinality) {
+    this.columnCardinality = columnCardinality;
+  }
+
+}


Reply via email to