http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
new file mode 100644
index 0000000..b24eaee
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.metadata.schema.table.column;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.metadata.ColumnIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+
+public class CarbonColumn implements Serializable {
+
+  /**
+   * serialization version
+   */
+  private static final long serialVersionUID = 3648269871256322681L;
+
+  /**
+   * column schema
+   */
+  protected ColumnSchema columnSchema;
+
+  /**
+   * table ordinal
+   */
+  protected int ordinal;
+
+  /**
+   * order in which user has created table
+   */
+  protected int schemaOrdinal;
+  /**
+   * default value for in case of restructuring will be used when older
+   * segment does not have particular column
+   */
+  protected byte[] defaultValue;
+
+  /**
+   * Column identifier
+   */
+  protected ColumnIdentifier columnIdentifier;
+
+  public CarbonColumn(ColumnSchema columnSchema, int ordinal, int 
schemaOrdinal) {
+    this.columnSchema = columnSchema;
+    this.ordinal = ordinal;
+    this.schemaOrdinal = schemaOrdinal;
+    this.columnIdentifier =
+     new ColumnIdentifier(getColumnId(), getColumnProperties(), getDataType());
+  }
+  /**
+   * @return columnar or row based
+   */
+  public boolean isColumnar() {
+    return columnSchema.isColumnar();
+  }
+
+  /**
+   * @return column unique id
+   */
+  public String getColumnId() {
+    return columnSchema.getColumnUniqueId();
+  }
+
+  /**
+   * @return the dataType
+   */
+  public DataType getDataType() {
+    return columnSchema.getDataType();
+  }
+
+  /**
+   * @return the colName
+   */
+  public String getColName() {
+    return columnSchema.getColumnName();
+  }
+
+  /**
+   * @return the ordinal
+   */
+  public int getOrdinal() {
+    return ordinal;
+  }
+
+  /**
+   * @return the list of encoder used in dimension
+   */
+  public List<Encoding> getEncoder() {
+    return columnSchema.getEncodingList();
+  }
+
+  /**
+   * @return row group id if it is row based
+   */
+  public int columnGroupId() {
+    return columnSchema.getColumnGroupId();
+  }
+
+  /**
+   * @return the defaultValue
+   */
+  public byte[] getDefaultValue() {
+    return defaultValue;
+  }
+
+  /**
+   * @param defaultValue the defaultValue to set
+   */
+  public void setDefaultValue(byte[] defaultValue) {
+    this.defaultValue = defaultValue;
+  }
+
+  /**
+   * @param encoding
+   * @return true if contains the passing encoding
+   */
+  public boolean hasEncoding(Encoding encoding) {
+    return columnSchema.hasEncoding(encoding);
+  }
+
+  /**
+   * @return if DataType is ARRAY or STRUCT, this method return true, else
+   * false.
+   */
+  public Boolean isComplex() {
+    return columnSchema.isComplex();
+  }
+
+  /**
+   * @return if column is dimension return true, else false.
+   */
+  public Boolean isDimesion() {
+    return columnSchema.isDimensionColumn();
+  }
+
+  /**
+   * return the visibility
+   * @return
+   */
+  public boolean isInvisible() {
+    return columnSchema.isInvisible();
+  }
+
+  /**
+   * @return if column use inverted index return true, else false.
+   */
+  public Boolean isUseInvertedIndex() {
+    return columnSchema.isUseInvertedIndex();
+  }
+  public ColumnSchema getColumnSchema() {
+    return this.columnSchema;
+  }
+
+  /**
+   * @return columnproperty
+   */
+  public Map<String, String> getColumnProperties() {
+    return this.columnSchema.getColumnProperties();
+  }
+
+  /**
+   * @return columnIdentifier
+   */
+  public ColumnIdentifier getColumnIdentifier() {
+    return this.columnIdentifier;
+  }
+
+  public int getSchemaOrdinal() {
+    return this.schemaOrdinal;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonDimension.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonDimension.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonDimension.java
new file mode 100644
index 0000000..5f45b09
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonDimension.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.metadata.schema.table.column;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+
+public class CarbonDimension extends CarbonColumn {
+  /**
+   * serialization version
+   */
+  private static final long serialVersionUID = 3648269871656322681L;
+
+  /**
+   * List of child dimension for complex type
+   */
+  private List<CarbonDimension> listOfChildDimensions;
+
+  /**
+   * in case of dictionary dimension this will store the ordinal
+   * of the dimension in mdkey
+   */
+  private int keyOrdinal;
+
+  /**
+   * column group column ordinal
+   * for example if column is second column in the group
+   * it will store 2
+   */
+  private int columnGroupOrdinal;
+
+  /**
+   * to store complex type dimension ordinal
+   */
+  private int complexTypeOrdinal;
+
+  public CarbonDimension(ColumnSchema columnSchema, int ordinal, int 
keyOrdinal,
+          int columnGroupOrdinal, int complexTypeOrdinal) {
+       this(columnSchema, ordinal, 0, keyOrdinal, columnGroupOrdinal, 
complexTypeOrdinal);
+  }
+
+  public CarbonDimension(ColumnSchema columnSchema, int ordinal, int 
schemaOrdinal, int keyOrdinal,
+      int columnGroupOrdinal, int complexTypeOrdinal) {
+    super(columnSchema, ordinal, schemaOrdinal);
+    this.keyOrdinal = keyOrdinal;
+    this.columnGroupOrdinal = columnGroupOrdinal;
+    this.complexTypeOrdinal = complexTypeOrdinal;
+  }
+
+  /**
+   * this method will initialize list based on number of child dimensions Count
+   */
+  public void initializeChildDimensionsList(int childDimension) {
+    listOfChildDimensions = new ArrayList<CarbonDimension>(childDimension);
+  }
+
+  /**
+   * @return number of children for complex type
+   */
+  public int getNumberOfChild() {
+    return columnSchema.getNumberOfChild();
+  }
+
+  /**
+   * @return list of children dims for complex type
+   */
+  public List<CarbonDimension> getListOfChildDimensions() {
+    return listOfChildDimensions;
+  }
+
+  /**
+   * @return return the number of child present in case of complex type
+   */
+  public int numberOfChild() {
+    return columnSchema.getNumberOfChild();
+  }
+
+  public boolean hasEncoding(Encoding encoding) {
+    return columnSchema.getEncodingList().contains(encoding);
+  }
+
+  /**
+   * @return the keyOrdinal
+   */
+  public int getKeyOrdinal() {
+    return keyOrdinal;
+  }
+
+  /**
+   * @return the columnGroupOrdinal
+   */
+  public int getColumnGroupOrdinal() {
+    return columnGroupOrdinal;
+  }
+
+  /**
+   * @return the complexTypeOrdinal
+   */
+  public int getComplexTypeOrdinal() {
+    return complexTypeOrdinal;
+  }
+
+  public void setComplexTypeOridnal(int complexTypeOrdinal) {
+    this.complexTypeOrdinal = complexTypeOrdinal;
+  }
+
+  /**
+   * to generate the hash code for this class
+   */
+  @Override public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((columnSchema == null) ? 0 : 
columnSchema.hashCode());
+    return result;
+  }
+
+  /**
+   * to check whether to dimension are equal or not
+   */
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof CarbonDimension)) {
+      return false;
+    }
+    CarbonDimension other = (CarbonDimension) obj;
+    if (columnSchema == null) {
+      if (other.columnSchema != null) {
+        return false;
+      }
+    } else if (!columnSchema.equals(other.columnSchema)) {
+      return false;
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonImplicitDimension.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonImplicitDimension.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonImplicitDimension.java
new file mode 100644
index 0000000..1e9880f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonImplicitDimension.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.metadata.schema.table.column;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+
+/**
+ * This instance will be created for implicit column like tupleid.
+ */
+public class CarbonImplicitDimension extends CarbonDimension {
+  /**
+   * serialization version
+   */
+  private static final long serialVersionUID = 3648269871656322681L;
+
+  /**
+   * List of encoding that are chained to encode the data for this column
+   */
+  private List<Encoding> encodingList;
+
+  /**
+   *
+   * @param ordinal
+   */
+  private String implicitDimensionName;
+
+  public CarbonImplicitDimension(int ordinal, String implicitDimensionName) {
+    super(null, ordinal, -1, -1, -1);
+    encodingList = new ArrayList<Encoding>();
+    encodingList.add(Encoding.IMPLICIT);
+    this.implicitDimensionName = implicitDimensionName;
+  }
+
+  public boolean hasEncoding(Encoding encoding) {
+    return encodingList.contains(encoding);
+  }
+
+  /**
+   * @return column unique id
+   */
+  public String getColumnId() {
+    return UUID.randomUUID().toString();
+  }
+
+  public Map<String, String> getColumnProperties() {
+    return null;
+  }
+
+  /**
+   * @return if DataType is ARRAY or STRUCT, this method return true, else
+   * false.
+   */
+  public Boolean isComplex() {
+    return false;
+  }
+
+  /**
+   * @return row group id if it is row based
+   */
+  @Override public int columnGroupId() {
+    return -1;
+  }
+
+  /**
+   * @return the list of encoder used in dimension
+   */
+  @Override public List<Encoding> getEncoder() {
+    return encodingList;
+  }
+
+  /**
+   * @return return the number of child present in case of complex type
+   */
+  @Override public int numberOfChild() {
+    return 0;
+  }
+
+  /**
+   * @return the colName
+   */
+  @Override public String getColName() {
+    return implicitDimensionName;
+  }
+
+  /**
+   * @return if column is dimension return true, else false.
+   */
+  @Override public Boolean isDimesion() {
+    return true;
+  }
+
+  /**
+   * @return number of children for complex type
+   */
+  public int getNumberOfChild() {
+    return 0;
+  }
+
+  /**
+   * @return the dataType
+   */
+  @Override public DataType getDataType() {
+    return DataType.STRING;
+  }
+
+  /**
+   * @return columnar or row based
+   */
+  public boolean isColumnar() {
+    return true;
+  }
+
+  /**
+   * To specify the visibily of the column by default its false
+   */
+  public boolean isInvisible() {
+    return true;
+  }
+
+  /**
+   * to generate the hash code for this class
+   */
+  @Override public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((implicitDimensionName == null) ?
+        super.hashCode() :
+        super.hashCode() + implicitDimensionName.hashCode());
+    return result;
+  }
+
+  /**
+   * to check whether to dimension are equal or not
+   */
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof CarbonImplicitDimension)) {
+      return false;
+    }
+    CarbonImplicitDimension other = (CarbonImplicitDimension) obj;
+    if (columnSchema == null) {
+      if (other.columnSchema != null) {
+        return false;
+      }
+    }
+    if (!getColName().equals(other.getColName())) {
+      return false;
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonMeasure.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonMeasure.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonMeasure.java
new file mode 100644
index 0000000..3e006ad
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonMeasure.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.metadata.schema.table.column;
+
+/**
+ * class represent column(measure) in table
+ */
+public class CarbonMeasure extends CarbonColumn {
+
+  /**
+   * serialization version
+   */
+  private static final long serialVersionUID = 354341488059013977L;
+
+  /**
+   * Used when this column contains decimal data.
+   */
+  private int scale;
+
+  /**
+   * precision in decimal data
+   */
+  private int precision;
+
+  public CarbonMeasure(ColumnSchema columnSchema, int ordinal) {
+    this(columnSchema, ordinal, 0);
+  }
+
+  public CarbonMeasure(ColumnSchema columnSchema, int ordinal, int 
schemaOrdinal) {
+    super(columnSchema, ordinal, schemaOrdinal);
+    this.scale = columnSchema.getScale();
+    this.precision = columnSchema.getPrecision();
+  }
+
+  /**
+   * @return the scale
+   */
+  public int getScale() {
+    return scale;
+  }
+
+  /**
+   * @return the precision
+   */
+  public int getPrecision() {
+    return precision;
+  }
+
+  /**
+   * to check whether to dimension are equal or not
+   */
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof CarbonMeasure)) {
+      return false;
+    }
+    CarbonMeasure other = (CarbonMeasure) obj;
+    if (columnSchema == null) {
+      if (other.columnSchema != null) {
+        return false;
+      }
+    } else if (!columnSchema.equals(other.columnSchema)) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * hash code
+   * @return
+   */
+  @Override public int hashCode() {
+    return this.getColumnSchema().getColumnUniqueId().hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
new file mode 100644
index 0000000..c99ab9a
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.metadata.schema.table.column;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+
+/**
+ * Store the information about the column meta data present the table
+ */
+public class ColumnSchema implements Serializable {
+
+  /**
+   * serialization version
+   */
+  private static final long serialVersionUID = 7676766554874863763L;
+
+  /**
+   * dataType
+   */
+  private DataType dataType;
+  /**
+   * Name of the column. If it is a complex data type, we follow a naming rule
+   * grand_parent_column.parent_column.child_column
+   * For Array types, two columns will be stored one for
+   * the array type and one for the primitive type with
+   * the name parent_column.value
+   */
+  private String columnName;
+
+  /**
+   * Unique ID for a column. if this is dimension,
+   * it is an unique ID that used in dictionary
+   */
+  private String columnUniqueId;
+
+  /**
+   * column reference id
+   */
+  private String columnReferenceId;
+
+  /**
+   * whether it is stored as columnar format or row format
+   */
+  private boolean isColumnar = true;
+
+  /**
+   * List of encoding that are chained to encode the data for this column
+   */
+  private List<Encoding> encodingList;
+
+  /**
+   * Whether the column is a dimension or measure
+   */
+  private boolean isDimensionColumn;
+
+  /**
+   * Whether the column should use inverted index
+   */
+  private boolean useInvertedIndex = true;
+
+  /**
+   * The group ID for column used for row format columns,
+   * where in columns in each group are chunked together.
+   */
+  private int columnGroupId = -1;
+
+  /**
+   * Used when this column contains decimal data.
+   */
+  private int scale;
+
+  private int precision;
+
+  private int schemaOrdinal;
+  /**
+   * Nested fields.  Since thrift does not support nested fields,
+   * the nesting is flattened to a single list by a depth-first traversal.
+   * The children count is used to construct the nested relationship.
+   * This field is not set when the element is a primitive type
+   */
+  private int numberOfChild;
+
+  /**
+   * used in case of schema restructuring
+   */
+  private byte[] defaultValue;
+
+  /**
+   * Column properties
+   */
+  private Map<String, String> columnProperties;
+
+  /**
+   * used to define the column visibility of column default is false
+   */
+  private boolean invisible = false;
+
+  /**
+   * @return the columnName
+   */
+  public String getColumnName() {
+    return columnName;
+  }
+
+  /**
+   * @param columnName the columnName to set
+   */
+  public void setColumnName(String columnName) {
+    this.columnName = columnName;
+  }
+
+  /**
+   * @return the columnUniqueId
+   */
+  public String getColumnUniqueId() {
+    return columnUniqueId;
+  }
+
+  /**
+   * @param columnUniqueId the columnUniqueId to set
+   */
+  public void setColumnUniqueId(String columnUniqueId) {
+    this.columnUniqueId = columnUniqueId;
+  }
+
+  /**
+   * @return the isColumnar
+   */
+  public boolean isColumnar() {
+    return isColumnar;
+  }
+
+  /**
+   * @param isColumnar the isColumnar to set
+   */
+  public void setColumnar(boolean isColumnar) {
+    this.isColumnar = isColumnar;
+  }
+
+  /**
+   * @return the isDimensionColumn
+   */
+  public boolean isDimensionColumn() {
+    return isDimensionColumn;
+  }
+
+  /**
+   * @param isDimensionColumn the isDimensionColumn to set
+   */
+  public void setDimensionColumn(boolean isDimensionColumn) {
+    this.isDimensionColumn = isDimensionColumn;
+  }
+
+  /**
+   * the isUseInvertedIndex
+   */
+  public boolean isUseInvertedIndex() {
+    return useInvertedIndex;
+  }
+
+  /**
+   * @param useInvertedIndex the useInvertedIndex to set
+   */
+  public void setUseInvertedIndex(boolean useInvertedIndex) {
+    this.useInvertedIndex = useInvertedIndex;
+  }
+
+  /**
+   * @return the columnGroup
+   */
+  public int getColumnGroupId() {
+    return columnGroupId;
+  }
+
+  /**
+   */
+  public void setColumnGroup(int columnGroupId) {
+    this.columnGroupId = columnGroupId;
+  }
+
+  /**
+   * @return the scale
+   */
+  public int getScale() {
+    return scale;
+  }
+
+  /**
+   * @param scale the scale to set
+   */
+  public void setScale(int scale) {
+    this.scale = scale;
+  }
+
+  /**
+   * @return the precision
+   */
+  public int getPrecision() {
+    return precision;
+  }
+
+  /**
+   * @param precision the precision to set
+   */
+  public void setPrecision(int precision) {
+    this.precision = precision;
+  }
+
+  /**
+   * @return the numberOfChild
+   */
+  public int getNumberOfChild() {
+    return numberOfChild;
+  }
+
+  /**
+   * @param numberOfChild the numberOfChild to set
+   */
+  public void setNumberOfChild(int numberOfChild) {
+    this.numberOfChild = numberOfChild;
+  }
+
+  /**
+   * @return the defaultValue
+   */
+  public byte[] getDefaultValue() {
+    return defaultValue;
+  }
+
+  /**
+   * @param defaultValue the defaultValue to set
+   */
+  public void setDefaultValue(byte[] defaultValue) {
+    this.defaultValue = defaultValue;
+  }
+
+  /**
+   * hash code method to check get the hashcode based.
+   * for generating the hash code only column name and column unique id will 
considered
+   */
+  @Override public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((columnName == null) ? 0 : 
columnName.hashCode());
+    return result;
+  }
+
+  /**
+   * Overridden equals method for columnSchema
+   */
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof ColumnSchema)) {
+      return false;
+    }
+    ColumnSchema other = (ColumnSchema) obj;
+    if (columnName == null) {
+      if (other.columnName != null) {
+        return false;
+      }
+    } else if (!columnName.equals(other.columnName)) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * @return the dataType
+   */
+  public DataType getDataType() {
+    return dataType;
+  }
+
+  /**
+   * @param dataType the dataType to set
+   */
+  public void setDataType(DataType dataType) {
+    this.dataType = dataType;
+  }
+
+  /**
+   * @return the encoderList
+   */
+  public List<Encoding> getEncodingList() {
+    return encodingList;
+  }
+
+  /**
+   */
+  public void setEncodingList(List<Encoding> encodingList) {
+    this.encodingList = encodingList;
+  }
+
+  /**
+   * @param encoding
+   * @return true if contains the passing encoding
+   */
+  public boolean hasEncoding(Encoding encoding) {
+    if (encodingList == null || encodingList.isEmpty()) {
+      return false;
+    } else {
+      return encodingList.contains(encoding);
+    }
+  }
+
+  /**
+   * @return if DataType is ARRAY or STRUCT, this method return true, else
+   * false.
+   */
+  public Boolean isComplex() {
+    if (DataType.ARRAY.equals(this.getDataType()) || 
DataType.STRUCT.equals(this.getDataType())) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * @param columnProperties
+   */
+  public void setColumnProperties(Map<String, String> columnProperties) {
+    this.columnProperties = columnProperties;
+  }
+
+  /**
+   * @param property
+   * @return
+   */
+  public String getColumnProperty(String property) {
+    if (null != columnProperties) {
+      return columnProperties.get(property);
+    }
+    return null;
+  }
+
+  /**
+   * return columnproperties
+   */
+  public Map<String, String> getColumnProperties() {
+    return columnProperties;
+  }
+  /**
+   * return the visibility
+   * @return
+   */
+  public boolean isInvisible() {
+    return invisible;
+  }
+
+  /**
+   * set the visibility
+   * @param invisible
+   */
+  public void setInvisible(boolean invisible) {
+    this.invisible = invisible;
+  }
+
+  /**
+   * @return columnReferenceId
+   */
+  public String getColumnReferenceId() {
+    return columnReferenceId;
+  }
+
+  /**
+   * @param columnReferenceId
+   */
+  public void setColumnReferenceId(String columnReferenceId) {
+    this.columnReferenceId = columnReferenceId;
+  }
+
+  public int getSchemaOrdinal() {
+    return schemaOrdinal;
+  }
+
+  public void setSchemaOrdinal(int schemaOrdinal) {
+    this.schemaOrdinal = schemaOrdinal;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java 
b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
new file mode 100644
index 0000000..fe38a56
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.mutate;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.locks.ICarbonLock;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.data.BlockMappingVO;
+import org.apache.carbondata.core.mutate.data.RowCountDetailsVO;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+
+/**
+ * This class contains all update utility methods
+ */
+public class CarbonUpdateUtil {
+
+  private static final LogService LOGGER =
+          LogServiceFactory.getLogService(CarbonUpdateUtil.class.getName());
+
+  /**
+   * returns required filed from tuple id
+   *
+   * @param Tid
+   * @param tid
+   * @return
+   */
+  public static String getRequiredFieldFromTID(String Tid, TupleIdEnum tid) {
+    return Tid.split("/")[tid.getTupleIdIndex()];
+  }
+
+  /**
+   * returns segment along with block id
+   * @param Tid
+   * @return
+   */
+  public static String getSegmentWithBlockFromTID(String Tid) {
+    return getRequiredFieldFromTID(Tid, TupleIdEnum.SEGMENT_ID)
+        + CarbonCommonConstants.FILE_SEPARATOR + getRequiredFieldFromTID(Tid, 
TupleIdEnum.BLOCK_ID);
+  }
+
+  /**
+   * Returns block path from tuple id
+   *
+   * @param tid
+   * @param factPath
+   * @return
+   */
+  public static String getTableBlockPath(String tid, String factPath) {
+    String part =
+            CarbonTablePath.addPartPrefix(getRequiredFieldFromTID(tid, 
TupleIdEnum.PART_ID));
+    String segment =
+            CarbonTablePath.addSegmentPrefix(getRequiredFieldFromTID(tid, 
TupleIdEnum.SEGMENT_ID));
+    return factPath + CarbonCommonConstants.FILE_SEPARATOR + part
+            + CarbonCommonConstants.FILE_SEPARATOR + segment;
+
+  }
+
+  /**
+   * returns delete delta file path
+   *
+   * @param blockPath
+   * @param blockPath
+   * @param timestamp
+   * @return
+   */
+  public static String getDeleteDeltaFilePath(String blockPath, String 
blockName,
+                                              String timestamp) {
+    return blockPath + CarbonCommonConstants.FILE_SEPARATOR + blockName
+        + CarbonCommonConstants.HYPHEN + timestamp + 
CarbonCommonConstants.DELETE_DELTA_FILE_EXT;
+
+  }
+
+  /**
+   * @param updateDetailsList
+   * @param table
+   * @param updateStatusFileIdentifier
+   * @return
+   */
+  public static boolean updateSegmentStatus(List<SegmentUpdateDetails> 
updateDetailsList,
+      CarbonTable table, String updateStatusFileIdentifier, boolean 
isCompaction) {
+    boolean status = false;
+    SegmentUpdateStatusManager segmentUpdateStatusManager =
+            new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier());
+    ICarbonLock updateLock = 
segmentUpdateStatusManager.getTableUpdateStatusLock();
+    boolean lockStatus = false;
+
+    try {
+      lockStatus = updateLock.lockWithRetries();
+      if (lockStatus) {
+
+        AbsoluteTableIdentifier absoluteTableIdentifier = 
table.getAbsoluteTableIdentifier();
+
+        CarbonTablePath carbonTablePath = CarbonStorePath
+                .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+                        absoluteTableIdentifier.getCarbonTableIdentifier());
+
+        // read the existing file if present and update the same.
+        SegmentUpdateDetails[] oldDetails = segmentUpdateStatusManager
+                .getUpdateStatusDetails();
+
+        List<SegmentUpdateDetails> oldList = new 
ArrayList(Arrays.asList(oldDetails));
+
+        for (SegmentUpdateDetails newBlockEntry : updateDetailsList) {
+          int index = oldList.indexOf(newBlockEntry);
+          if (index != -1) {
+            // update the element in existing list.
+            SegmentUpdateDetails blockDetail = oldList.get(index);
+            if(blockDetail.getDeleteDeltaStartTimestamp().isEmpty() ||
+                    (isCompaction == true)) {
+              blockDetail
+                      
.setDeleteDeltaStartTimestamp(newBlockEntry.getDeleteDeltaStartTimestamp());
+            }
+            
blockDetail.setDeleteDeltaEndTimestamp(newBlockEntry.getDeleteDeltaEndTimestamp());
+            blockDetail.setStatus(newBlockEntry.getStatus());
+            
blockDetail.setDeletedRowsInBlock(newBlockEntry.getDeletedRowsInBlock());
+          } else {
+            // add the new details to the list.
+            oldList.add(newBlockEntry);
+          }
+        }
+
+        segmentUpdateStatusManager.writeLoadDetailsIntoFile(oldList, 
updateStatusFileIdentifier);
+        status = true;
+      } else {
+        LOGGER.error("Not able to acquire the segment update lock.");
+        status = false;
+      }
+    } catch (IOException e) {
+      status = false;
+    } finally {
+      if (lockStatus) {
+        if (updateLock.unlock()) {
+          LOGGER.info("Unlock the segment update lock successfull.");
+        } else {
+          LOGGER.error("Not able to unlock the segment update lock.");
+        }
+      }
+    }
+    return status;
+  }
+
+  /**
+   *
+   * @param updatedSegmentsList
+   * @param table
+   * @param updatedTimeStamp
+   * @param isTimestampUpdationRequired
+   * @param segmentsToBeDeleted
+   * @return
+   */
+  public static boolean updateTableMetadataStatus(Set<String> 
updatedSegmentsList,
+                                                  CarbonTable table, String 
updatedTimeStamp,
+                                                  boolean 
isTimestampUpdationRequired,
+                                                  List<String> 
segmentsToBeDeleted) {
+
+    boolean status = false;
+
+    String metaDataFilepath = table.getMetaDataFilepath();
+
+    AbsoluteTableIdentifier absoluteTableIdentifier = 
table.getAbsoluteTableIdentifier();
+
+    CarbonTablePath carbonTablePath = CarbonStorePath
+            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+                    absoluteTableIdentifier.getCarbonTableIdentifier());
+
+    String tableStatusPath = carbonTablePath.getTableStatusFilePath();
+
+    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(absoluteTableIdentifier);
+
+    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+    boolean lockStatus = false;
+    try {
+      lockStatus = carbonLock.lockWithRetries();
+      if (lockStatus) {
+        LOGGER.info(
+                "Acquired lock for table" + table.getDatabaseName() + "." + 
table.getFactTableName()
+                        + " for table status updation");
+
+        LoadMetadataDetails[] listOfLoadFolderDetailsArray =
+                segmentStatusManager.readLoadMetadata(metaDataFilepath);
+
+        for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
+
+          if (isTimestampUpdationRequired) {
+            // we are storing the link between the 2 status files in the 
segment 0 only.
+            if (loadMetadata.getLoadName().equalsIgnoreCase("0")) {
+              loadMetadata.setUpdateStatusFileName(
+                      
CarbonUpdateUtil.getUpdateStatusFileName(updatedTimeStamp));
+            }
+
+            // if the segments is in the list of marked for delete then update 
the status.
+            if (segmentsToBeDeleted.contains(loadMetadata.getLoadName())) {
+              
loadMetadata.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE);
+              
loadMetadata.setModificationOrdeletionTimesStamp(Long.parseLong(updatedTimeStamp));
+            }
+          }
+          for (String segName : updatedSegmentsList) {
+            if (loadMetadata.getLoadName().equalsIgnoreCase(segName)) {
+              // if this call is coming from the delete delta flow then the 
time stamp
+              // String will come empty then no need to write into table 
status file.
+              if (isTimestampUpdationRequired) {
+                loadMetadata.setIsDeleted(CarbonCommonConstants.KEYWORD_TRUE);
+                // if in case of update flow.
+                if (loadMetadata.getUpdateDeltaStartTimestamp().isEmpty()) {
+                  // this means for first time it is getting updated .
+                  loadMetadata.setUpdateDeltaStartTimestamp(updatedTimeStamp);
+                }
+                // update end timestamp for each time.
+                loadMetadata.setUpdateDeltaEndTimestamp(updatedTimeStamp);
+              }
+            }
+          }
+        }
+
+        try {
+          segmentStatusManager
+                  .writeLoadDetailsIntoFile(tableStatusPath, 
listOfLoadFolderDetailsArray);
+        } catch (IOException e) {
+          return false;
+        }
+
+        status = true;
+      } else {
+        LOGGER.error("Not able to acquire the lock for Table status updation 
for table " + table
+                .getDatabaseName() + "." + table.getFactTableName());
+      }
+    } finally {
+      if (lockStatus) {
+        if (carbonLock.unlock()) {
+          LOGGER.info(
+                 "Table unlocked successfully after table status updation" + 
table.getDatabaseName()
+                          + "." + table.getFactTableName());
+        } else {
+          LOGGER.error(
+                  "Unable to unlock Table lock for table" + 
table.getDatabaseName() + "." + table
+                          .getFactTableName() + " during table status 
updation");
+        }
+      }
+    }
+    return status;
+
+  }
+
+  /**
+   * gets the file name of the update status file. by appending the latest 
timestamp to it.
+   *
+   * @param updatedTimeStamp
+   * @return
+   */
+  public static String getUpdateStatusFileName(String updatedTimeStamp) {
+    return CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME + 
CarbonCommonConstants.HYPHEN
+            + updatedTimeStamp;
+  }
+
+  /**
+   * This will handle the clean up cases if the update fails.
+   *
+   * @param table
+   * @param timeStamp
+   */
+  public static void cleanStaleDeltaFiles(CarbonTable table, final String 
timeStamp) {
+
+    AbsoluteTableIdentifier absoluteTableIdentifier = 
table.getAbsoluteTableIdentifier();
+
+    CarbonTablePath carbonTablePath = CarbonStorePath
+            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+                    absoluteTableIdentifier.getCarbonTableIdentifier());
+    // as of now considering only partition 0.
+    String partitionId = "0";
+    String partitionDir = carbonTablePath.getPartitionDir(partitionId);
+    CarbonFile file =
+            FileFactory.getCarbonFile(partitionDir, 
FileFactory.getFileType(partitionDir));
+    if(!file.exists()) {
+      return;
+    }
+    for (CarbonFile eachDir : file.listFiles()) {
+      // for each dir check if the file with the delta timestamp is present or 
not.
+      CarbonFile[] toBeDeleted = eachDir.listFiles(new CarbonFileFilter() {
+        @Override public boolean accept(CarbonFile file) {
+          String fileName = file.getName();
+          return (fileName.endsWith(timeStamp + 
CarbonCommonConstants.UPDATE_DELTA_FILE_EXT)
+                  || fileName.endsWith(timeStamp + 
CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+                  || fileName.endsWith(timeStamp + 
CarbonCommonConstants.DELETE_DELTA_FILE_EXT));
+        }
+      });
+      // deleting the files of a segment.
+      try {
+        CarbonUtil.deleteFoldersAndFilesSilent(toBeDeleted);
+      } catch (IOException e) {
+        LOGGER.error("Exception in deleting the delta files." + e);
+      } catch (InterruptedException e) {
+        LOGGER.error("Exception in deleting the delta files." + e);
+      }
+    }
+  }
+
+  /**
+   * returns timestamp as long value
+   *
+   * @param timtstamp
+   * @return
+   */
+  public static Long getTimeStampAsLong(String timtstamp) {
+    try {
+      long longValue = Long.parseLong(timtstamp);
+      return longValue;
+    } catch (NumberFormatException nfe) {
+      String errorMsg = "Invalid timestamp : " + timtstamp;
+      LOGGER.error(errorMsg);
+      return null;
+    }
+  }
+
+  /**
+   * returns integer value from given string
+   *
+   * @param value
+   * @return
+   * @throws Exception
+   */
+  public static Integer getIntegerValue(String value) throws Exception {
+    try {
+      int intValue = Integer.parseInt(value);
+      return intValue;
+    } catch (NumberFormatException nfe) {
+      LOGGER.error("Invalid row : " + value + nfe.getLocalizedMessage());
+      throw new Exception("Invalid row : " + nfe.getLocalizedMessage());
+    }
+  }
+
+  /**
+   * return only block name from completeBlockName
+   *
+   * @param completeBlockName
+   * @return
+   */
+  public static String getBlockName(String completeBlockName) {
+    String blockName =
+        completeBlockName.substring(0, 
completeBlockName.lastIndexOf(CarbonCommonConstants.HYPHEN));
+    return blockName;
+  }
+
+  /**
+   * returns segment id from segment name
+   *
+   * @param segmentName
+   * @return
+   */
+  public static String getSegmentId(String segmentName) {
+    String id = segmentName.split(CarbonCommonConstants.UNDERSCORE)[1];
+    return id;
+  }
+
+  public static int getLatestTaskIdForSegment(String segmentId, 
CarbonTablePath tablePath) {
+    String segmentDirPath = tablePath.getCarbonDataDirectoryPath("0", 
segmentId);
+
+    // scan all the carbondata files and get the latest task ID.
+    CarbonFile segment =
+            FileFactory.getCarbonFile(segmentDirPath, 
FileFactory.getFileType(segmentDirPath));
+    CarbonFile[] dataFiles = segment.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile file) {
+
+        if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
+          return true;
+        }
+        return false;
+      }
+    });
+    int max = 0;
+    if (null != dataFiles) {
+      for (CarbonFile file : dataFiles) {
+        int taskNumber = 
Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(file.getName()));
+        if (taskNumber > max) {
+          max = taskNumber;
+        }
+      }
+    }
+    // return max task No
+    return max;
+
+  }
+
+  public static String getLatestBlockNameForSegment(String segmentId, 
CarbonTablePath tablePath) {
+    String segmentDirPath = tablePath.getCarbonDataDirectoryPath("0", 
segmentId);
+
+    // scan all the carbondata files and get the latest task ID.
+    CarbonFile segment =
+            FileFactory.getCarbonFile(segmentDirPath, 
FileFactory.getFileType(segmentDirPath));
+
+    CarbonFile[] dataFiles = segment.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile file) {
+        int max = 0;
+        if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
+          int taskNumber = 
Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(file.getName()));
+          if (taskNumber >= max) {
+            return true;
+          }
+        }
+        return false;
+      }
+    });
+
+    // get the latest among the data files. highest task number will be at the 
last.
+    return dataFiles[dataFiles.length - 1].getName();
+  }
+
+  /**
+   * This method will convert a given timestamp to long value and then to 
string back
+   *
+   * @param factTimeStamp
+   * @return
+   */
+  public static String convertTimeStampToString(String factTimeStamp) {
+    SimpleDateFormat parser = new 
SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
+    Date dateToStr = null;
+    try {
+      dateToStr = parser.parse(factTimeStamp);
+      return Long.toString(dateToStr.getTime());
+    } catch (ParseException e) {
+      LOGGER.error("Cannot convert" + factTimeStamp + " to Time/Long type 
value" + e.getMessage());
+      return null;
+    }
+  }
+
+  /**
+   * This method will convert a given timestamp to long value and then to 
string back
+   *
+   * @param factTimeStamp
+   * @return
+   */
+  public static long convertTimeStampToLong(String factTimeStamp) {
+    SimpleDateFormat parser = new 
SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS);
+    Date dateToStr = null;
+    try {
+      dateToStr = parser.parse(factTimeStamp);
+      return dateToStr.getTime();
+    } catch (ParseException e) {
+      LOGGER.error("Cannot convert" + factTimeStamp + " to Time/Long type 
value" + e.getMessage());
+      parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
+      try {
+        dateToStr = parser.parse(factTimeStamp);
+        return dateToStr.getTime();
+      } catch (ParseException e1) {
+        LOGGER
+            .error("Cannot convert" + factTimeStamp + " to Time/Long type 
value" + e1.getMessage());
+        return 0;
+      }
+    }
+  }
+
+
+  /**
+   * Handling of the clean up of old carbondata files, index files , delte 
delta,
+   * update status files.
+   * @param table clean up will be handled on this table.
+   * @param forceDelete if true then max query execution timeout will not be 
considered.
+   */
+  public static void cleanUpDeltaFiles(CarbonTable table, boolean forceDelete) 
{
+
+    SegmentStatusManager ssm = new 
SegmentStatusManager(table.getAbsoluteTableIdentifier());
+
+    CarbonTablePath carbonTablePath = CarbonStorePath
+            
.getCarbonTablePath(table.getAbsoluteTableIdentifier().getStorePath(),
+                    
table.getAbsoluteTableIdentifier().getCarbonTableIdentifier());
+
+    LoadMetadataDetails[] details = 
ssm.readLoadMetadata(table.getMetaDataFilepath());
+
+    String validUpdateStatusFile = "";
+
+    // scan through each segment.
+
+    for (LoadMetadataDetails segment : details) {
+
+      // take the update status file name from 0th segment.
+      validUpdateStatusFile = ssm.getUpdateStatusFileName(details);
+
+      // if this segment is valid then only we will go for delta file deletion.
+      // if the segment is mark for delete or compacted then any way it will 
get deleted.
+
+      if 
(segment.getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+              || segment.getLoadStatus()
+              
.equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)) {
+
+        // take the list of files from this segment.
+        String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", 
segment.getLoadName());
+        CarbonFile segDir =
+                FileFactory.getCarbonFile(segmentPath, 
FileFactory.getFileType(segmentPath));
+        CarbonFile[] allSegmentFiles = segDir.listFiles();
+
+        // scan through the segment and find the carbondatafiles and index 
files.
+        SegmentUpdateStatusManager updateStatusManager =
+                new 
SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier());
+
+        // get Invalid update  delta files.
+        CarbonFile[] invalidUpdateDeltaFiles = updateStatusManager
+                .getUpdateDeltaFilesList(segment.getLoadName(), false,
+                        CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, true, 
allSegmentFiles);
+
+        // now for each invalid delta file need to check the query execution 
time out
+        // and then delete.
+
+        for (CarbonFile invalidFile : invalidUpdateDeltaFiles) {
+
+          compareTimestampsAndDelete(invalidFile, forceDelete, false);
+        }
+
+        // do the same for the index files.
+        CarbonFile[] invalidIndexFiles = updateStatusManager
+                .getUpdateDeltaFilesList(segment.getLoadName(), false,
+                        CarbonCommonConstants.UPDATE_INDEX_FILE_EXT, true, 
allSegmentFiles);
+
+        // now for each invalid index file need to check the query execution 
time out
+        // and then delete.
+
+        for (CarbonFile invalidFile : invalidIndexFiles) {
+
+          compareTimestampsAndDelete(invalidFile, forceDelete, false);
+        }
+
+        // now handle all the delete delta files which needs to be deleted.
+        // there are 2 cases here .
+        // 1. if the block is marked as compacted then the corresponding delta 
files
+        //    can be deleted if query exec timeout is done.
+        // 2. if the block is in success state then also there can be delete
+        //    delta compaction happened and old files can be deleted.
+
+        SegmentUpdateDetails[] updateDetails = 
updateStatusManager.readLoadMetadata();
+        for (SegmentUpdateDetails block : updateDetails) {
+          CarbonFile[] completeListOfDeleteDeltaFiles;
+          CarbonFile[] invalidDeleteDeltaFiles;
+
+          if (!block.getSegmentName().equalsIgnoreCase(segment.getLoadName())) 
{
+            continue;
+          }
+
+          // case 1
+          if (CarbonUpdateUtil.isBlockInvalid(block.getStatus())) {
+            completeListOfDeleteDeltaFiles = updateStatusManager
+                    .getDeleteDeltaInvalidFilesList(segment.getLoadName(), 
block, true,
+                            allSegmentFiles);
+            for (CarbonFile invalidFile : completeListOfDeleteDeltaFiles) {
+
+              compareTimestampsAndDelete(invalidFile, forceDelete, false);
+            }
+
+            CarbonFile[] blockRelatedFiles = updateStatusManager
+                    .getAllBlockRelatedFiles(block.getBlockName(), 
allSegmentFiles,
+                            block.getActualBlockName());
+
+            // now for each invalid index file need to check the query 
execution time out
+            // and then delete.
+
+            for (CarbonFile invalidFile : blockRelatedFiles) {
+
+              compareTimestampsAndDelete(invalidFile, forceDelete, false);
+            }
+
+
+          } else {
+            invalidDeleteDeltaFiles = updateStatusManager
+                    .getDeleteDeltaInvalidFilesList(segment.getLoadName(), 
block, false,
+                            allSegmentFiles);
+            for (CarbonFile invalidFile : invalidDeleteDeltaFiles) {
+
+              compareTimestampsAndDelete(invalidFile, forceDelete, false);
+            }
+          }
+        }
+      }
+    }
+
+    // delete the update table status files which are old.
+    if (null != validUpdateStatusFile && !validUpdateStatusFile.isEmpty()) {
+
+      final String updateStatusTimestamp = validUpdateStatusFile
+              
.substring(validUpdateStatusFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1);
+
+      CarbonFile metaFolder = 
FileFactory.getCarbonFile(carbonTablePath.getMetadataDirectoryPath(),
+              
FileFactory.getFileType(carbonTablePath.getMetadataDirectoryPath()));
+
+      CarbonFile[] invalidUpdateStatusFiles = metaFolder.listFiles(new 
CarbonFileFilter() {
+        @Override public boolean accept(CarbonFile file) {
+          if 
(file.getName().startsWith(CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME)) {
+
+            // CHECK if this is valid or not.
+            // we only send invalid ones to delete.
+            if (!file.getName().endsWith(updateStatusTimestamp)) {
+              return true;
+            }
+          }
+          return false;
+        }
+      });
+
+      for (CarbonFile invalidFile : invalidUpdateStatusFiles) {
+
+        compareTimestampsAndDelete(invalidFile, forceDelete, true);
+      }
+    }
+  }
+
+  /**
+   * This will tell whether the max query timeout has been expired or not.
+   * @param fileTimestamp
+   * @return
+   */
+  public static boolean isMaxQueryTimeoutExceeded(long fileTimestamp) {
+    // record current time.
+    long currentTime = CarbonUpdateUtil.readCurrentTime();
+    int maxTime;
+    try {
+      maxTime = Integer.parseInt(CarbonProperties.getInstance()
+              .getProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME));
+    } catch (NumberFormatException e) {
+      maxTime = CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME;
+    }
+
+    long difference = currentTime - fileTimestamp;
+
+    long minutesElapsed = (difference / (1000 * 60));
+
+    return minutesElapsed > maxTime;
+
+  }
+
+  /**
+   *
+   * @param invalidFile
+   * @param forceDelete
+   * @param isUpdateStatusFile if true then the parsing of file name logic 
changes.
+   */
+  private static void compareTimestampsAndDelete(CarbonFile invalidFile,
+                                                 boolean forceDelete, boolean 
isUpdateStatusFile) {
+    long fileTimestamp = 0L;
+
+    if (isUpdateStatusFile) {
+      fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(invalidFile.getName()
+              
.substring(invalidFile.getName().lastIndexOf(CarbonCommonConstants.HYPHEN) + 
1));
+    } else {
+      fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(
+              
CarbonTablePath.DataFileUtil.getTimeStampFromFileName(invalidFile.getName()));
+    }
+
+    // if the timestamp of the file is more than the current time by query 
execution timeout.
+    // then delete that file.
+    if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp) || 
forceDelete) {
+      // delete the files.
+      try {
+        LOGGER.info("deleting the invalid file : " + invalidFile.getName());
+        CarbonUtil.deleteFoldersAndFiles(invalidFile);
+      } catch (IOException e) {
+        LOGGER.error("error in clean up of compacted files." + e.getMessage());
+      } catch (InterruptedException e) {
+        LOGGER.error("error in clean up of compacted files." + e.getMessage());
+      }
+    }
+  }
+
+  /**
+   *
+   * @param blockStatus
+   * @return
+   */
+  public static boolean isBlockInvalid(String blockStatus) {
+    if (blockStatus.equalsIgnoreCase(CarbonCommonConstants.COMPACTED) || 
blockStatus
+            .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * This will return the current time in millis.
+   * @return
+   */
+  public static long readCurrentTime() {
+    return System.currentTimeMillis();
+  }
+
+  /**
+   *
+   * @param details
+   * @param segmentBlockCount
+   */
+  public static void decrementDeletedBlockCount(SegmentUpdateDetails details,
+                                                Map<String, Long> 
segmentBlockCount) {
+
+    String segId = details.getSegmentName();
+
+    segmentBlockCount.put(details.getSegmentName(), 
segmentBlockCount.get(segId) - 1);
+
+  }
+
+  /**
+   *
+   * @param segmentBlockCount
+   * @return
+   */
+  public static List<String> getListOfSegmentsToMarkDeleted( Map<String, Long> 
segmentBlockCount) {
+    List<String> segmentsToBeDeleted =
+            new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    for (Map.Entry<String, Long> eachSeg : segmentBlockCount.entrySet()) {
+
+      if (eachSeg.getValue() == 0) {
+        segmentsToBeDeleted.add(eachSeg.getKey());
+      }
+
+    }
+    return segmentsToBeDeleted;
+  }
+
+  /**
+   *
+   * @param blockMappingVO
+   * @param segmentUpdateStatusManager
+   */
+  public static void createBlockDetailsMap(BlockMappingVO blockMappingVO,
+                                           SegmentUpdateStatusManager 
segmentUpdateStatusManager) {
+
+    Map<String, Long> blockRowCountMap = 
blockMappingVO.getBlockRowCountMapping();
+
+    Map<String, RowCountDetailsVO> outputMap =
+            new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    for (Map.Entry<String, Long> blockRowEntry : blockRowCountMap.entrySet()) {
+      String key = blockRowEntry.getKey();
+      long alreadyDeletedCount = 0;
+
+      SegmentUpdateDetails detail = 
segmentUpdateStatusManager.getDetailsForABlock(key);
+
+      if (null != detail) {
+
+        alreadyDeletedCount = Long.parseLong(detail.getDeletedRowsInBlock());
+
+      }
+
+      RowCountDetailsVO rowCountDetailsVO =
+              new RowCountDetailsVO(blockRowEntry.getValue(), 
alreadyDeletedCount);
+      outputMap.put(key, rowCountDetailsVO);
+
+    }
+
+    blockMappingVO.setCompleteBlockRowDetailVO(outputMap);
+
+  }
+
+  /**
+   *
+   * @param segID
+   * @param blockName
+   * @return
+   */
+  public static String getSegmentBlockNameKey(String segID, String blockName) {
+
+    String blockNameWithOutPart = blockName
+            .substring(blockName.indexOf(CarbonCommonConstants.HYPHEN) + 1,
+                    
blockName.lastIndexOf(CarbonTablePath.getCarbonDataExtension()));
+
+    return segID + CarbonCommonConstants.FILE_SEPARATOR + blockNameWithOutPart;
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockDetails.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockDetails.java
 
b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockDetails.java
new file mode 100644
index 0000000..65b999c
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockDetails.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.mutate;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
+/**
+ * This class stores the block details of delete delta file
+ */
+public class DeleteDeltaBlockDetails implements Serializable {
+
+  private static final long serialVersionUID = 1206104914918495724L;
+
+  private List<DeleteDeltaBlockletDetails> blockletDetails;
+  private String blockName;
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DeleteDeltaBlockDetails.class.getName());
+
+  public DeleteDeltaBlockDetails(String blockName) {
+    this.blockName = blockName;
+    blockletDetails = new ArrayList<DeleteDeltaBlockletDetails>();
+  }
+
+  public String getBlockName() {
+    return blockName;
+  }
+
+  public void setBlockName(String blockName) {
+    this.blockName = blockName;
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (this == obj) return true;
+    if (obj == null || !(obj instanceof DeleteDeltaBlockDetails)) return false;
+
+    DeleteDeltaBlockDetails that = (DeleteDeltaBlockDetails) obj;
+
+    return blockName.equals(that.blockName);
+
+  }
+
+  @Override public int hashCode() {
+    return blockName.hashCode();
+  }
+
+  public List<DeleteDeltaBlockletDetails> getBlockletDetails() {
+    return blockletDetails;
+  }
+
+  public boolean addBlockletDetails(DeleteDeltaBlockletDetails blocklet) {
+    int index = blockletDetails.indexOf(blocklet);
+    if (blockletDetails.isEmpty() || index == -1) {
+      return blockletDetails.add(blocklet);
+    } else {
+      return 
blockletDetails.get(index).addDeletedRows(blocklet.getDeletedRows());
+    }
+  }
+
+  public boolean addBlocklet(String blockletId, String offset) throws 
Exception {
+    DeleteDeltaBlockletDetails blocklet = new 
DeleteDeltaBlockletDetails(blockletId);
+    try {
+      blocklet.addDeletedRow(CarbonUpdateUtil.getIntegerValue(offset));
+      return addBlockletDetails(blocklet);
+    } catch (Exception e) {
+      LOGGER.debug(e.getMessage());
+      throw e;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java
 
b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java
new file mode 100644
index 0000000..48f80df
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaBlockletDetails.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.mutate;
+
+import java.io.Serializable;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
+/**
+ * This class stores the blocklet details of delete delta file
+ */
+public class DeleteDeltaBlockletDetails implements Serializable {
+
+  private static final long serialVersionUID = 1206104914911491724L;
+  private String id;
+  private Set<Integer> deletedRows;
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(DeleteDeltaBlockletDetails.class.getName());
+
+  public DeleteDeltaBlockletDetails(String id) {
+    this.id = id;
+    deletedRows = new TreeSet<Integer>();
+  }
+
+  public boolean addDeletedRows(Set<Integer> rows) {
+    return deletedRows.addAll(rows);
+  }
+
+  public boolean addDeletedRow(Integer row) {
+    return deletedRows.add(row);
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public void setId(String id) {
+    this.id = id;
+  }
+
+  public Set<Integer> getDeletedRows() {
+    return deletedRows;
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || !(obj instanceof DeleteDeltaBlockletDetails)) {
+      return false;
+    }
+
+    DeleteDeltaBlockletDetails that = (DeleteDeltaBlockletDetails) obj;
+    return id.equals(that.id);
+  }
+
+  @Override public int hashCode() {
+    return id.hashCode();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/mutate/SegmentUpdateDetails.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/mutate/SegmentUpdateDetails.java
 
b/core/src/main/java/org/apache/carbondata/core/mutate/SegmentUpdateDetails.java
new file mode 100644
index 0000000..e75be5e
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/mutate/SegmentUpdateDetails.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.mutate;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
+/**
+ * This class stores the segment details of table update status file
+ */
+public class SegmentUpdateDetails implements Serializable {
+
+  private static final long serialVersionUID = 1206104914918491724L;
+  private String segmentName;
+  private String blockName;
+  private String status = "";
+  private String deleteDeltaEndTimestamp = "";
+  private String deleteDeltaStartTimestamp = "";
+  private String actualBlockName;
+  private String deletedRowsInBlock = "0";
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(SegmentUpdateDetails.class.getName());
+
+  public String getDeleteDeltaEndTimestamp() {
+    return deleteDeltaEndTimestamp;
+  }
+
+  public void setDeleteDeltaEndTimestamp(String deleteDeltaEndTimestamp) {
+    this.deleteDeltaEndTimestamp = deleteDeltaEndTimestamp;
+  }
+
+  public String getSegmentName() {
+    return segmentName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    this.segmentName = segmentName;
+  }
+
+  public String getBlockName() {
+    return blockName;
+  }
+
+  public void setBlockName(String blockName) {
+    this.blockName = blockName;
+  }
+
+  public String getDeleteDeltaStartTimestamp() {
+    return deleteDeltaStartTimestamp;
+  }
+
+  public void setDeleteDeltaStartTimestamp(String deleteDeltaStartTimestamp) {
+    this.deleteDeltaStartTimestamp = deleteDeltaStartTimestamp;
+  }
+
+  public void setStatus(String status) { this.status = status;}
+
+  public String getStatus() {return this.status;}
+
+  @Override public int hashCode() {
+    final int prime = 31;
+    int result = segmentName.hashCode();
+    result = prime * result + blockName.hashCode();
+    return result;
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+
+    }
+    if (!(obj instanceof SegmentUpdateDetails)) {
+      return false;
+    }
+    SegmentUpdateDetails other = (SegmentUpdateDetails) obj;
+    if (segmentName == null) {
+      if (other.segmentName != null) {
+        return false;
+      }
+    } else if (!segmentName.equals(other.segmentName)) {
+      return false;
+    }
+    if (blockName == null) {
+      if (other.blockName != null) {
+        return false;
+      }
+    } else if (!blockName.equals(other.blockName)) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * return deleteDeltaTime as long
+   *
+   * @return
+   */
+  public long getDeleteDeltaEndTimeAsLong() {
+    return getTimeStampAsLong(deleteDeltaEndTimestamp);
+  }
+
+  /**
+   * return deleteDeltaTime as long
+   *
+   * @return
+   */
+  public long getDeleteDeltaStartTimeAsLong() {
+
+    return getTimeStampAsLong(deleteDeltaStartTimestamp);
+  }
+
+  /**
+   * returns complete block name
+   *
+   * @return
+   */
+  public String getActualBlockName() {
+    return actualBlockName;
+  }
+
+  public void setActualBlockName(String actualBlockName) {
+    this.actualBlockName = actualBlockName;
+  }
+
+  /**
+   * returns timestamp as long value
+   *
+   * @param timtstamp
+   * @return
+   */
+  private Long getTimeStampAsLong(String timtstamp) {
+    long longValue = 0;
+    try {
+      longValue = Long.parseLong(timtstamp);
+    } catch (NumberFormatException nfe) {
+      String errorMsg = "Invalid timestamp : " + timtstamp;
+      LOGGER.debug(errorMsg);
+    }
+    return longValue;
+  }
+
+  public String getDeletedRowsInBlock() {
+    return deletedRowsInBlock;
+  }
+
+  public void setDeletedRowsInBlock(String deletedRowsInBlock) {
+    this.deletedRowsInBlock = deletedRowsInBlock;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/mutate/TupleIdEnum.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/mutate/TupleIdEnum.java 
b/core/src/main/java/org/apache/carbondata/core/mutate/TupleIdEnum.java
new file mode 100644
index 0000000..b986b72
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/TupleIdEnum.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.mutate;
+
+/**
+ * Enum class for tupleID.
+ */
+public enum TupleIdEnum {
+  PART_ID(0),
+  SEGMENT_ID(1),
+  BLOCK_ID(2),
+  BLOCKLET_ID(3),
+  OFFSET(4);
+
+  private int index;
+
+  TupleIdEnum(int index) {
+    this.index = index;
+  }
+
+  public int getTupleIdIndex(){
+    return this.index;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/mutate/UpdateVO.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/UpdateVO.java 
b/core/src/main/java/org/apache/carbondata/core/mutate/UpdateVO.java
new file mode 100644
index 0000000..9f13ed1
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/UpdateVO.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.mutate;
+
+import java.io.Serializable;
+
+/**
+ * VO class for storing details related to Update operation.
+ */
+public class UpdateVO implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private Long factTimestamp;
+
+  private Long updateDeltaStartTimestamp;
+
+  private String segmentId;
+
+  public Long getLatestUpdateTimestamp() {
+    return latestUpdateTimestamp;
+  }
+
+  public void setLatestUpdateTimestamp(Long latestUpdateTimestamp) {
+    this.latestUpdateTimestamp = latestUpdateTimestamp;
+  }
+
+  private Long latestUpdateTimestamp;
+
+  public Long getFactTimestamp() {
+    return factTimestamp;
+  }
+
+  public void setFactTimestamp(Long factTimestamp) {
+    this.factTimestamp = factTimestamp;
+  }
+
+  public Long getUpdateDeltaStartTimestamp() {
+    return updateDeltaStartTimestamp;
+  }
+
+  public void setUpdateDeltaStartTimestamp(Long updateDeltaStartTimestamp) {
+    this.updateDeltaStartTimestamp = updateDeltaStartTimestamp;
+  }
+
+  @Override public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    UpdateVO updateVO = (UpdateVO) o;
+    if (factTimestamp != null ?
+        !factTimestamp.equals(updateVO.factTimestamp) :
+        updateVO.factTimestamp != null) {
+      return false;
+    }
+    if (updateDeltaStartTimestamp != null ?
+        !updateDeltaStartTimestamp.equals(updateVO.updateDeltaStartTimestamp) :
+        updateVO.updateDeltaStartTimestamp != null) {
+      return false;
+    }
+    return latestUpdateTimestamp != null ?
+        latestUpdateTimestamp.equals(updateVO.latestUpdateTimestamp) :
+        updateVO.latestUpdateTimestamp == null;
+
+  }
+
+  @Override public int hashCode() {
+    int result = factTimestamp != null ? factTimestamp.hashCode() : 0;
+    result = 31 * result + (updateDeltaStartTimestamp != null ?
+        updateDeltaStartTimestamp.hashCode() :
+        0);
+    result = 31 * result + (latestUpdateTimestamp != null ? 
latestUpdateTimestamp.hashCode() : 0);
+    return result;
+  }
+
+  /**
+   * This will return the update timestamp if its present or it will return 
the fact timestamp.
+   * @return
+   */
+  public Long getCreatedOrUpdatedTimeStamp() {
+    if (null == latestUpdateTimestamp) {
+      return factTimestamp;
+    }
+    return latestUpdateTimestamp;
+  }
+
+  public String getSegmentId() {
+    return segmentId;
+  }
+
+  public void setSegmentId(String segmentId) {
+    this.segmentId = segmentId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockMappingVO.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockMappingVO.java 
b/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockMappingVO.java
new file mode 100644
index 0000000..e9ce73f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockMappingVO.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.mutate.data;
+
+import java.util.Map;
+
+/**
+ * VO class to store the details of segment and block count , block and its 
row count.
+ */
+public class BlockMappingVO {
+
+  private Map<String, Long> blockRowCountMapping ;
+
+  private Map<String, Long> segmentNumberOfBlockMapping ;
+
+  private Map<String, RowCountDetailsVO> completeBlockRowDetailVO;
+
+  public void setCompleteBlockRowDetailVO(Map<String, RowCountDetailsVO> 
completeBlockRowDetailVO) {
+    this.completeBlockRowDetailVO = completeBlockRowDetailVO;
+  }
+
+  public Map<String, RowCountDetailsVO> getCompleteBlockRowDetailVO() {
+    return completeBlockRowDetailVO;
+  }
+
+  public Map<String, Long> getBlockRowCountMapping() {
+    return blockRowCountMapping;
+  }
+
+  public Map<String, Long> getSegmentNumberOfBlockMapping() {
+    return segmentNumberOfBlockMapping;
+  }
+
+  public BlockMappingVO(Map<String, Long> blockRowCountMapping,
+      Map<String, Long> segmentNumberOfBlockMapping) {
+    this.blockRowCountMapping = blockRowCountMapping;
+    this.segmentNumberOfBlockMapping = segmentNumberOfBlockMapping;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockletDeleteDeltaCacheLoader.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockletDeleteDeltaCacheLoader.java
 
b/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockletDeleteDeltaCacheLoader.java
new file mode 100644
index 0000000..25451fe
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockletDeleteDeltaCacheLoader.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.mutate.data;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+
+/**
+ * This class is responsible for loading delete delta file cache based on
+ * blocklet id of a particular block
+ */
+public class BlockletDeleteDeltaCacheLoader implements 
DeleteDeltaCacheLoaderIntf {
+  private String blockletID;
+  private DataRefNode blockletNode;
+  private AbsoluteTableIdentifier absoluteIdentifier;
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(BlockletDeleteDeltaCacheLoader.class.getName());
+
+  public BlockletDeleteDeltaCacheLoader(String blockletID,
+       DataRefNode blockletNode, AbsoluteTableIdentifier absoluteIdentifier) {
+    this.blockletID = blockletID;
+    this.blockletNode = blockletNode;
+    this.absoluteIdentifier= absoluteIdentifier;
+  }
+
+  /**
+   * This method will load the delete delta cache based on blocklet id of 
particular block with
+   * the help of SegmentUpdateStatusManager.
+   */
+  public void loadDeleteDeltaFileDataToCache() {
+    SegmentUpdateStatusManager segmentUpdateStatusManager =
+        new SegmentUpdateStatusManager(absoluteIdentifier);
+    int[] deleteDeltaFileData = null;
+    BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache = null;
+    if (null == blockletNode.getDeleteDeltaDataCache()) {
+      try {
+        deleteDeltaFileData = 
segmentUpdateStatusManager.getDeleteDeltaDataFromAllFiles(blockletID);
+        deleteDeltaDataCache = new 
BlockletLevelDeleteDeltaDataCache(deleteDeltaFileData,
+            segmentUpdateStatusManager.getTimestampForRefreshCache(blockletID, 
null));
+      } catch (Exception e) {
+        LOGGER.debug("Unable to retrieve delete delta files");
+      }
+    } else {
+      deleteDeltaDataCache = blockletNode.getDeleteDeltaDataCache();
+      // if already cache is present then validate the cache using timestamp
+      String cacheTimeStamp = segmentUpdateStatusManager
+          .getTimestampForRefreshCache(blockletID, 
deleteDeltaDataCache.getCacheTimeStamp());
+      if (null != cacheTimeStamp) {
+        try {
+          deleteDeltaFileData =
+              
segmentUpdateStatusManager.getDeleteDeltaDataFromAllFiles(blockletID);
+          deleteDeltaDataCache = new 
BlockletLevelDeleteDeltaDataCache(deleteDeltaFileData,
+              
segmentUpdateStatusManager.getTimestampForRefreshCache(blockletID, 
cacheTimeStamp));
+        } catch (Exception e) {
+          LOGGER.debug("Unable to retrieve delete delta files");
+        }
+      }
+    }
+    blockletNode.setDeleteDeltaDataCache(deleteDeltaDataCache);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/mutate/data/DeleteDeltaCacheLoaderIntf.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/mutate/data/DeleteDeltaCacheLoaderIntf.java
 
b/core/src/main/java/org/apache/carbondata/core/mutate/data/DeleteDeltaCacheLoaderIntf.java
new file mode 100644
index 0000000..924e03a
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/mutate/data/DeleteDeltaCacheLoaderIntf.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.mutate.data;
+
+/**
+ * This interface holds all methods required to load delete delta file data to 
cache
+ */
+public interface DeleteDeltaCacheLoaderIntf {
+
+  void loadDeleteDeltaFileDataToCache();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/mutate/data/RowCountDetailsVO.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/mutate/data/RowCountDetailsVO.java
 
b/core/src/main/java/org/apache/carbondata/core/mutate/data/RowCountDetailsVO.java
new file mode 100644
index 0000000..b7264fa
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/mutate/data/RowCountDetailsVO.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.mutate.data;
+
+import java.io.Serializable;
+
+/**
+ * VO class Details for block.
+ */
+public class RowCountDetailsVO implements Serializable {
+
+  private static final long serialVersionUID = 1206104914918491749L;
+
+  private long totalNumberOfRows;
+
+  private long deletedRowsInBlock;
+
+  public RowCountDetailsVO(long totalNumberOfRows, long deletedRowsInBlock) {
+    this.totalNumberOfRows = totalNumberOfRows;
+    this.deletedRowsInBlock = deletedRowsInBlock;
+  }
+
+  public long getTotalNumberOfRows() {
+    return totalNumberOfRows;
+  }
+
+  public long getDeletedRowsInBlock() {
+    return deletedRowsInBlock;
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+
+    RowCountDetailsVO that = (RowCountDetailsVO) obj;
+
+    if (totalNumberOfRows != that.totalNumberOfRows) {
+      return false;
+    }
+    return deletedRowsInBlock == that.deletedRowsInBlock;
+
+  }
+
+  @Override public int hashCode() {
+    int result = (int) (totalNumberOfRows ^ (totalNumberOfRows >>> 32));
+    result = 31 * result + (int) (deletedRowsInBlock ^ (deletedRowsInBlock >>> 
32));
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/partition/Partitioner.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/partition/Partitioner.java 
b/core/src/main/java/org/apache/carbondata/core/partition/Partitioner.java
deleted file mode 100644
index 1907687..0000000
--- a/core/src/main/java/org/apache/carbondata/core/partition/Partitioner.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.partition;
-
-/**
- * Partitions the data as per key
- */
-public interface Partitioner<Key> {
-
-  int getPartition(Key key);
-
-}


Reply via email to