http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
----------------------------------------------------------------------
diff --git 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
index 76ee084..4e07182 100644
--- 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
+++ 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
@@ -17,85 +17,74 @@
 package org.apache.carbondata.datamap.bloom;
 
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.util.CarbonUtil;
 
 import com.google.common.hash.BloomFilter;
 import com.google.common.hash.Funnels;
 
 /**
- * BloomDataMap is constructed in blocklet level. For each indexed column, a 
bloom filter is
- * constructed to indicate whether a value belongs to this blocklet. Bloom 
filter of blocklet that
- * belongs to same block will be written to one index file suffixed with 
.bloomindex. So the number
+ * BloomDataMap is constructed in CG level (blocklet level).
+ * For each indexed column, a bloom filter is constructed to indicate whether 
a value
+ * belongs to this blocklet. Bloom filter of blocklet that belongs to same 
block will
+ * be written to one index file suffixed with .bloomindex. So the number
  * of bloom index file will be equal to that of the blocks.
  */
 @InterfaceAudience.Internal
 public class BloomDataMapWriter extends DataMapWriter {
-  private String dataMapName;
-  private List<String> indexedColumns;
+  private static final LogService LOG = LogServiceFactory.getLogService(
+      BloomDataMapWriter.class.getCanonicalName());
   private int bloomFilterSize;
-  // map column name to ordinal in pages
-  private Map<String, Integer> col2Ordianl;
-  private Map<String, DataType> col2DataType;
-  private String indexShardName;
-  private int currentBlockletId;
+  protected int currentBlockletId;
   private List<String> currentDMFiles;
   private List<DataOutputStream> currentDataOutStreams;
   private List<ObjectOutputStream> currentObjectOutStreams;
-  private List<BloomFilter<byte[]>> indexBloomFilters;
-
-  @InterfaceAudience.Internal
-  public BloomDataMapWriter(AbsoluteTableIdentifier identifier, DataMapMeta 
dataMapMeta,
-      int bloomFilterSize, Segment segment, String writeDirectoryPath) {
-    super(identifier, segment, writeDirectoryPath);
-    dataMapName = dataMapMeta.getDataMapName();
-    indexedColumns = dataMapMeta.getIndexedColumns();
-    this.bloomFilterSize = bloomFilterSize;
-    col2Ordianl = new HashMap<String, Integer>(indexedColumns.size());
-    col2DataType = new HashMap<String, DataType>(indexedColumns.size());
+  protected List<BloomFilter<byte[]>> indexBloomFilters;
 
-    currentDMFiles = new ArrayList<String>(indexedColumns.size());
-    currentDataOutStreams = new 
ArrayList<DataOutputStream>(indexedColumns.size());
-    currentObjectOutStreams = new 
ArrayList<ObjectOutputStream>(indexedColumns.size());
+  BloomDataMapWriter(String tablePath, String dataMapName, List<CarbonColumn> 
indexColumns,
+      Segment segment, String shardName, int bloomFilterSize) throws 
IOException {
+    super(tablePath, dataMapName, indexColumns, segment, shardName);
+    this.bloomFilterSize = bloomFilterSize;
 
-    indexBloomFilters = new 
ArrayList<BloomFilter<byte[]>>(indexedColumns.size());
+    currentDMFiles = new ArrayList<String>(indexColumns.size());
+    currentDataOutStreams = new 
ArrayList<DataOutputStream>(indexColumns.size());
+    currentObjectOutStreams = new 
ArrayList<ObjectOutputStream>(indexColumns.size());
+    indexBloomFilters = new 
ArrayList<BloomFilter<byte[]>>(indexColumns.size());
+    initDataMapFile();
+    resetBloomFilters();
   }
 
   @Override
-  public void onBlockStart(String blockId, String indexShardName) throws 
IOException {
-    if (this.indexShardName == null) {
-      this.indexShardName = indexShardName;
-      initDataMapFile();
-    }
+  public void onBlockStart(String blockId) throws IOException {
   }
 
   @Override
   public void onBlockEnd(String blockId) throws IOException {
-
   }
 
   @Override
   public void onBlockletStart(int blockletId) {
-    this.currentBlockletId = blockletId;
+  }
+
+  protected void resetBloomFilters() {
     indexBloomFilters.clear();
-    for (int i = 0; i < indexedColumns.size(); i++) {
+    List<CarbonColumn> indexColumns = getIndexColumns();
+    for (int i = 0; i < indexColumns.size(); i++) {
       indexBloomFilters.add(BloomFilter.create(Funnels.byteArrayFunnel(),
           bloomFilterSize, 0.00001d));
     }
@@ -103,60 +92,51 @@ public class BloomDataMapWriter extends DataMapWriter {
 
   @Override
   public void onBlockletEnd(int blockletId) {
-    try {
-      writeBloomDataMapFile();
-    } catch (Exception e) {
-      for (ObjectOutputStream objectOutputStream : currentObjectOutStreams) {
-        CarbonUtil.closeStreams(objectOutputStream);
-      }
-      for (DataOutputStream dataOutputStream : currentDataOutStreams) {
-        CarbonUtil.closeStreams(dataOutputStream);
-      }
-      throw new RuntimeException(e);
-    }
+    writeBloomDataMapFile();
+    currentBlockletId++;
   }
 
-  // notice that the input pages only contains the indexed columns
   @Override
-  public void onPageAdded(int blockletId, int pageId, ColumnPage[] pages)
-      throws IOException {
-    col2Ordianl.clear();
-    col2DataType.clear();
-    for (int colId = 0; colId < pages.length; colId++) {
-      String columnName = 
pages[colId].getColumnSpec().getFieldName().toLowerCase();
-      col2Ordianl.put(columnName, colId);
-      DataType columnType = pages[colId].getColumnSpec().getSchemaDataType();
-      col2DataType.put(columnName, columnType);
-    }
-
-    // for each row
-    for (int rowId = 0; rowId < pages[0].getPageSize(); rowId++) {
-      // for each indexed column
-      for (int indexColId = 0; indexColId < indexedColumns.size(); 
indexColId++) {
-        String indexedCol = indexedColumns.get(indexColId);
+  public void onPageAdded(int blockletId, int pageId, int pageSize, 
ColumnPage[] pages) {
+    List<CarbonColumn> indexColumns = getIndexColumns();
+    for (int rowId = 0; rowId < pageSize; rowId++) {
+      // for each indexed column, add the data to bloom filter
+      for (int i = 0; i < indexColumns.size(); i++) {
+        Object data = pages[i].getData(rowId);
+        DataType dataType = indexColumns.get(i).getDataType();
         byte[] indexValue;
-        if (DataTypes.STRING == col2DataType.get(indexedCol)
-            || DataTypes.BYTE_ARRAY == col2DataType.get(indexedCol)) {
-          byte[] originValue = (byte[]) 
pages[col2Ordianl.get(indexedCol)].getData(rowId);
+        if (DataTypes.STRING == dataType) {
+          indexValue = getStringData(data);
+        } else if (DataTypes.BYTE_ARRAY == dataType) {
+          byte[] originValue = (byte[]) data;
+          // String and byte array is LV encoded, L is short type
           indexValue = new byte[originValue.length - 2];
           System.arraycopy(originValue, 2, indexValue, 0, originValue.length - 
2);
         } else {
-          Object originValue = 
pages[col2Ordianl.get(indexedCol)].getData(rowId);
-          indexValue = 
CarbonUtil.getValueAsBytes(col2DataType.get(indexedCol), originValue);
+          indexValue = CarbonUtil.getValueAsBytes(dataType, data);
         }
-
-        indexBloomFilters.get(indexColId).put(indexValue);
+        indexBloomFilters.get(i).put(indexValue);
       }
     }
   }
 
+  protected byte[] getStringData(Object data) {
+    byte[] lvData = (byte[]) data;
+    byte[] indexValue = new byte[lvData.length - 2];
+    System.arraycopy(lvData, 2, indexValue, 0, lvData.length - 2);
+    return indexValue;
+  }
+
   private void initDataMapFile() throws IOException {
-    String dataMapDir = genDataMapStorePath(this.writeDirectoryPath, 
this.dataMapName);
-    dataMapDir = dataMapDir + CarbonCommonConstants.FILE_SEPARATOR + 
this.indexShardName;
-    FileFactory.mkdirs(dataMapDir, FileFactory.getFileType(dataMapDir));
-    for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) 
{
-      String dmFile = dataMapDir + CarbonCommonConstants.FILE_SEPARATOR +
-          indexedColumns.get(indexColId) + 
BloomCoarseGrainDataMap.BLOOM_INDEX_SUFFIX;
+    if (!FileFactory.isFileExist(dataMapPath)) {
+      if (!FileFactory.mkdirs(dataMapPath, 
FileFactory.getFileType(dataMapPath))) {
+        throw new IOException("Failed to create directory " + dataMapPath);
+      }
+    }
+    List<CarbonColumn> indexColumns = getIndexColumns();
+    for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) {
+      String dmFile = dataMapPath + CarbonCommonConstants.FILE_SEPARATOR +
+          indexColumns.get(indexColId).getColName() + 
BloomCoarseGrainDataMap.BLOOM_INDEX_SUFFIX;
       DataOutputStream dataOutStream = null;
       ObjectOutputStream objectOutStream = null;
       try {
@@ -175,44 +155,45 @@ public class BloomDataMapWriter extends DataMapWriter {
     }
   }
 
-  private void writeBloomDataMapFile() throws IOException {
-    for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) 
{
-      BloomDMModel model = new BloomDMModel(this.currentBlockletId,
-          indexBloomFilters.get(indexColId));
-      // only in higher version of guava-bloom-filter, it provides 
readFrom/writeTo interface.
-      // In lower version, we use default java serializer to write bloomfilter.
-      this.currentObjectOutStreams.get(indexColId).writeObject(model);
-      this.currentObjectOutStreams.get(indexColId).flush();
-      this.currentDataOutStreams.get(indexColId).flush();
+  protected void writeBloomDataMapFile() {
+    List<CarbonColumn> indexColumns = getIndexColumns();
+    try {
+      for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) 
{
+        BloomDMModel model =
+            new BloomDMModel(this.currentBlockletId, 
indexBloomFilters.get(indexColId));
+        // only in higher version of guava-bloom-filter, it provides 
readFrom/writeTo interface.
+        // In lower version, we use default java serializer to write 
bloomfilter.
+        this.currentObjectOutStreams.get(indexColId).writeObject(model);
+        this.currentObjectOutStreams.get(indexColId).flush();
+        this.currentDataOutStreams.get(indexColId).flush();
+      }
+    } catch (Exception e) {
+      for (ObjectOutputStream objectOutputStream : currentObjectOutStreams) {
+        CarbonUtil.closeStreams(objectOutputStream);
+      }
+      for (DataOutputStream dataOutputStream : currentDataOutStreams) {
+        CarbonUtil.closeStreams(dataOutputStream);
+      }
+      throw new RuntimeException(e);
+    } finally {
+      resetBloomFilters();
     }
   }
 
   @Override
   public void finish() throws IOException {
-    for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) 
{
-      CarbonUtil.closeStreams(this.currentDataOutStreams.get(indexColId),
-          this.currentObjectOutStreams.get(indexColId));
-      commitFile(this.currentDMFiles.get(indexColId));
+    if (indexBloomFilters.size() > 0) {
+      writeBloomDataMapFile();
     }
+    releaseResouce();
   }
 
-  @Override
-  protected void commitFile(String dataMapFile) throws IOException {
-    super.commitFile(dataMapFile);
+  protected void releaseResouce() {
+    List<CarbonColumn> indexColumns = getIndexColumns();
+    for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) {
+      CarbonUtil.closeStreams(
+          currentDataOutStreams.get(indexColId), 
currentObjectOutStreams.get(indexColId));
+    }
   }
 
-  /**
-   * create and return path that will store the datamap
-   *
-   * @param dataPath patch to store the carbondata factdata
-   * @param dataMapName datamap name
-   * @return path to store the datamap
-   * @throws IOException
-   */
-  public static String genDataMapStorePath(String dataPath, String dataMapName)
-      throws IOException {
-    String dmDir = dataPath + File.separator + dataMapName;
-    FileFactory.mkdirs(dmDir, FileFactory.getFileType(dmDir));
-    return dmDir;
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
----------------------------------------------------------------------
diff --git 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
index 21a0b8e..0993218 100644
--- 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
+++ 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
@@ -33,15 +33,12 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 
@@ -58,7 +55,6 @@ public class MinMaxDataWriter extends DataMapWriter {
 
   private Map<Integer, BlockletMinMax> blockMinMaxMap;
 
-  private String dataMapName;
   private int columnCnt;
   private DataType[] dataTypeArray;
   private String indexShardName;
@@ -71,30 +67,25 @@ public class MinMaxDataWriter extends DataMapWriter {
    */
   private Map<Integer, Integer> origin2MinMaxOrdinal = new HashMap<>();
 
-  public MinMaxDataWriter(AbsoluteTableIdentifier identifier, String 
dataMapName, Segment segment,
-      String dataWritePath) {
-    super(identifier, segment, dataWritePath);
-    this.dataMapName = dataMapName;
-    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
-        identifier.getDatabaseName(), identifier.getTableName());
-    List<CarbonColumn> cols = 
carbonTable.getCreateOrderColumn(identifier.getTableName());
-    this.columnCnt = cols.size();
-    List<CarbonDimension> dimensions = 
carbonTable.getDimensionByTableName(identifier.getTableName());
-    for (int i = 0; i < dimensions.size(); i++) {
-      this.origin2MinMaxOrdinal.put(dimensions.get(i).getSchemaOrdinal(),
-          dimensions.get(i).getOrdinal());
+  public MinMaxDataWriter(CarbonTable carbonTable, DataMapSchema 
dataMapSchema, Segment segment,
+      String shardName, List<CarbonColumn> indexColumns) {
+    super(carbonTable.getTablePath(), dataMapSchema.getDataMapName(), 
indexColumns, segment,
+        shardName);
+    this.columnCnt = indexColumns.size();
+    for (CarbonColumn col : indexColumns) {
+      this.origin2MinMaxOrdinal.put(col.getSchemaOrdinal(), col.getOrdinal());
     }
-    List<CarbonMeasure> measures = 
carbonTable.getMeasureByTableName(identifier.getTableName());
-    for (int i = 0; i < measures.size(); i++) {
-      this.origin2MinMaxOrdinal.put(measures.get(i).getSchemaOrdinal(),
-          dimensions.size() + measures.get(i).getOrdinal());
+    if (this.dataTypeArray == null) {
+      this.dataTypeArray = new DataType[this.columnCnt];
+      for (int i = 0; i < this.columnCnt; i++) {
+        this.dataTypeArray[i] = indexColumns.get(i).getDataType();
+      }
     }
   }
 
-  @Override public void onBlockStart(String blockId, String indexShardName) {
+  @Override public void onBlockStart(String blockId) {
     if (blockMinMaxMap == null) {
       blockMinMaxMap = new HashMap<>();
-      this.indexShardName = indexShardName;
     }
   }
 
@@ -111,46 +102,15 @@ public class MinMaxDataWriter extends DataMapWriter {
   }
 
   @Override
-  public void onPageAdded(int blockletId, int pageId, ColumnPage[] pages) {
-    // Calculate Min and Max value within this page.
-
-    // As part of example we are extracting Min Max values Manually. The same 
can be done from
-    // retrieving the page statistics. For e.g.
-
-    // if (pageLevelMin == null && pageLevelMax == null) {
-    //    pageLevelMin[1] = 
CarbonUtil.getValueAsBytes(pages[0].getStatistics().getDataType(),
-    //        pages[0].getStatistics().getMin());
-    //    pageLevelMax[1] = 
CarbonUtil.getValueAsBytes(pages[0].getStatistics().getDataType(),
-    //        pages[0].getStatistics().getMax());
-    //  } else {
-    //    if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(pageLevelMin[1], 
CarbonUtil
-    //        .getValueAsBytes(pages[0].getStatistics().getDataType(),
-    //            pages[0].getStatistics().getMin())) > 0) {
-    //      pageLevelMin[1] = 
CarbonUtil.getValueAsBytes(pages[0].getStatistics().getDataType(),
-    //          pages[0].getStatistics().getMin());
-    //    }
-    //    if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(pageLevelMax[1], 
CarbonUtil
-    //        .getValueAsBytes(pages[0].getStatistics().getDataType(),
-    //            pages[0].getStatistics().getMax())) < 0) {
-    //      pageLevelMax[1] = 
CarbonUtil.getValueAsBytes(pages[0].getStatistics().getDataType(),
-    //          pages[0].getStatistics().getMax());
-    //    }
-
-    if (this.dataTypeArray == null) {
-      this.dataTypeArray = new DataType[this.columnCnt];
-      for (int i = 0; i < this.columnCnt; i++) {
-        this.dataTypeArray[i] = pages[i].getDataType();
-      }
-    }
-
+  public void onPageAdded(int blockletId, int pageId, int pageSize, 
ColumnPage[] pages) {
     // as an example, we don't use page-level min-max generated by native 
carbondata here, we get
     // the min-max by comparing each row
-    for (int rowId = 0; rowId < pages[0].getPageSize(); rowId++) {
+    for (int rowId = 0; rowId < pageSize; rowId++) {
       for (int colIdx = 0; colIdx < columnCnt; colIdx++) {
         Object originValue = pages[colIdx].getData(rowId);
+        DataType dataType = dataTypeArray[colIdx];
         // for string & bytes_array, data is prefixed with length, need to 
remove it
-        if (DataTypes.STRING == pages[colIdx].getDataType()
-            || DataTypes.BYTE_ARRAY == pages[colIdx].getDataType()) {
+        if (DataTypes.STRING == dataType || DataTypes.BYTE_ARRAY == dataType) {
           byte[] valueMin0 = (byte[]) pageLevelMin[colIdx];
           byte[] valueMax0 = (byte[]) pageLevelMax[colIdx];
           byte[] value1 = (byte[]) originValue;
@@ -164,10 +124,10 @@ public class MinMaxDataWriter extends DataMapWriter {
             pageLevelMax[colIdx] = new byte[value1.length - 2];
             System.arraycopy(value1, 2, (byte[]) pageLevelMax[colIdx], 0, 
value1.length - 2);
           }
-        } else if (DataTypes.INT == pages[colIdx].getDataType()) {
-          updateMinMax(colIdx, originValue, pages[colIdx].getDataType());
+        } else if (DataTypes.INT == dataType) {
+          updateMinMax(colIdx, originValue, dataType);
         } else {
-          throw new RuntimeException("Not implement yet");
+          throw new UnsupportedOperationException("Not implement yet");
         }
       }
     }
@@ -276,8 +236,7 @@ public class MinMaxDataWriter extends DataMapWriter {
    */
   public void writeMinMaxIndexFile(List<MinMaxIndexBlockDetails> 
minMaxIndexBlockDetails,
       String blockId) throws IOException {
-    String dataMapDir = genDataMapStorePath(this.writeDirectoryPath, 
this.dataMapName);
-    String filePath = dataMapDir + File.separator + blockId + ".minmaxindex";
+    String filePath = dataMapPath + File.separator + blockId + ".minmaxindex";
     BufferedWriter brWriter = null;
     DataOutputStream dataOutStream = null;
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
index c110887..4197b79 100644
--- 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
+++ 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
@@ -28,6 +28,7 @@ import 
org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.DataMapRefresher;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import 
org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactory;
@@ -51,25 +52,25 @@ import org.apache.commons.lang3.StringUtils;
 public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory {
   private static final LogService LOGGER = LogServiceFactory.getLogService(
       MinMaxIndexDataMapFactory.class.getName());
+  private DataMapSchema dataMapSchema;
   private DataMapMeta dataMapMeta;
   private String dataMapName;
   private AbsoluteTableIdentifier identifier;
 
+  public MinMaxIndexDataMapFactory(CarbonTable carbonTable) {
+    super(carbonTable);
+  }
+
   // this is an example for datamap, we can choose the columns and operations 
that
   // will be supported by this datamap. Furthermore, we can add cache-support 
for this datamap.
-  @Override public void init(CarbonTable carbonTable, DataMapSchema 
dataMapSchema)
+  @Override public void init(DataMapSchema dataMapSchema)
       throws IOException, MalformedDataMapCommandException {
+    this.dataMapSchema = dataMapSchema;
     this.identifier = carbonTable.getAbsoluteTableIdentifier();
     this.dataMapName = dataMapSchema.getDataMapName();
 
     // columns that will be indexed
     List<CarbonColumn> allColumns = 
carbonTable.getCreateOrderColumn(identifier.getTableName());
-    List<String> minMaxCols = (List) CollectionUtils.collect(allColumns, new 
Transformer() {
-      @Override public Object transform(Object o) {
-        return ((CarbonColumn) o).getColName();
-      }
-    });
-    LOGGER.info("MinMaxDataMap support index columns: " + 
StringUtils.join(minMaxCols, ", "));
 
     // operations that will be supported on the indexed columns
     List<ExpressionType> optOperations = new ArrayList<>();
@@ -80,17 +81,24 @@ public class MinMaxIndexDataMapFactory extends 
CoarseGrainDataMapFactory {
     optOperations.add(ExpressionType.LESSTHAN_EQUALTO);
     optOperations.add(ExpressionType.NOT_EQUALS);
     LOGGER.error("MinMaxDataMap support operations: " + 
StringUtils.join(optOperations, ", "));
-    this.dataMapMeta = new DataMapMeta(minMaxCols, optOperations);
+    this.dataMapMeta = new DataMapMeta(allColumns, optOperations);
   }
 
   /**
    * createWriter will return the MinMaxDataWriter.
    *
    * @param segment
+   * @param shardName
    * @return
    */
-  @Override public DataMapWriter createWriter(Segment segment, String 
writeDirectoryPath) {
-    return new MinMaxDataWriter(identifier, dataMapName, segment, 
writeDirectoryPath);
+  @Override public DataMapWriter createWriter(Segment segment, String 
shardName) {
+    return new MinMaxDataWriter(carbonTable, dataMapSchema, segment, shardName,
+        dataMapMeta.getIndexedColumns());
+  }
+
+  @Override public DataMapRefresher createRefresher(Segment segment, String 
shardName)
+      throws IOException {
+    return null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
index ba38319..dca5c90 100644
--- 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
+++ 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
@@ -22,15 +22,19 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
+import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.features.TableOperation;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 
 /**
  * FG level of lucene DataMap
@@ -40,6 +44,11 @@ public class LuceneCoarseGrainDataMapFactory extends 
LuceneDataMapFactoryBase<Co
   private static final LogService LOGGER =
       
LogServiceFactory.getLogService(LuceneCoarseGrainDataMapFactory.class.getName());
 
+  public LuceneCoarseGrainDataMapFactory(CarbonTable carbonTable, 
DataMapSchema dataMapSchema)
+      throws MalformedDataMapCommandException {
+    super(carbonTable, dataMapSchema);
+  }
+
   /**
    * Get the datamap for segmentid
    */
@@ -49,7 +58,7 @@ public class LuceneCoarseGrainDataMapFactory extends 
LuceneDataMapFactoryBase<Co
     CoarseGrainDataMap dataMap = new LuceneCoarseGrainDataMap(analyzer);
     try {
       dataMap.init(new DataMapModel(
-          LuceneDataMapWriter.genDataMapStorePath(
+          DataMapWriter.getDefaultDataMapPath(
               tableIdentifier.getTablePath(), segment.getSegmentNo(), 
dataMapName)));
     } catch (MemoryException e) {
       LOGGER.error("failed to get lucene datamap , detail is {}" + 
e.getMessage());
@@ -69,7 +78,7 @@ public class LuceneCoarseGrainDataMapFactory extends 
LuceneDataMapFactoryBase<Co
   }
 
   @Override
-  public DataMapLevel getDataMapType() {
+  public DataMapLevel getDataMapLevel() {
     return DataMapLevel.CG;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
index 1cde0c1..d52cef9 100644
--- 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
+++ 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapRefresher;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
@@ -51,7 +52,6 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
 
@@ -59,9 +59,7 @@ import org.apache.lucene.analysis.standard.StandardAnalyzer;
  * Base implementation for CG and FG lucene DataMapFactory.
  */
 @InterfaceAudience.Internal
-abstract class LuceneDataMapFactoryBase<T extends DataMap> implements 
DataMapFactory<T> {
-
-  static final String TEXT_COLUMNS = "text_columns";
+abstract class LuceneDataMapFactoryBase<T extends DataMap> extends 
DataMapFactory<T> {
 
   /**
    * Logger
@@ -88,26 +86,17 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> 
implements DataMapFac
    */
   AbsoluteTableIdentifier tableIdentifier = null;
 
-  /**
-   * indexed carbon columns for lucene
-   */
-  List<String> indexedCarbonColumns = null;
-
-  CarbonTable carbonTable = null;
-
-
-  @Override
-  public void init(CarbonTable carbonTable, DataMapSchema dataMapSchema)
-      throws IOException, MalformedDataMapCommandException {
+  public LuceneDataMapFactoryBase(CarbonTable carbonTable, DataMapSchema 
dataMapSchema)
+      throws MalformedDataMapCommandException {
+    super(carbonTable, dataMapSchema);
     Objects.requireNonNull(carbonTable.getAbsoluteTableIdentifier());
     Objects.requireNonNull(dataMapSchema);
 
-    this.carbonTable = carbonTable;
     this.tableIdentifier = carbonTable.getAbsoluteTableIdentifier();
     this.dataMapName = dataMapSchema.getDataMapName();
 
     // validate DataMapSchema and get index columns
-    List<String> indexedColumns =  validateAndGetIndexedColumns(dataMapSchema, 
carbonTable);
+    List<CarbonColumn> indexedColumns =  
carbonTable.getIndexedColumns(dataMapSchema);
 
     // add optimizedOperations
     List<ExpressionType> optimizedOperations = new ArrayList<ExpressionType>();
@@ -126,55 +115,6 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> 
implements DataMapFac
   }
 
   /**
-   * validate Lucene DataMap
-   * 1. require TEXT_COLUMNS property
-   * 2. TEXT_COLUMNS can't contains illegal argument(empty, blank)
-   * 3. TEXT_COLUMNS can't contains duplicate same columns
-   * 4. TEXT_COLUMNS should be exists in table columns
-   * 5. TEXT_COLUMNS support only String DataType columns
-   */
-  public static List<String> validateAndGetIndexedColumns(DataMapSchema 
dataMapSchema,
-      CarbonTable carbonTable) throws MalformedDataMapCommandException {
-    String textColumnsStr = dataMapSchema.getProperties().get(TEXT_COLUMNS);
-    if (textColumnsStr == null || StringUtils.isBlank(textColumnsStr)) {
-      throw new MalformedDataMapCommandException(
-          "Lucene DataMap require proper TEXT_COLUMNS property.");
-    }
-    String[] textColumns = textColumnsStr.split(",", -1);
-    for (int i = 0; i < textColumns.length; i++) {
-      textColumns[i] = textColumns[i].trim().toLowerCase();
-    }
-    for (int i = 0; i < textColumns.length; i++) {
-      if (textColumns[i].isEmpty()) {
-        throw new MalformedDataMapCommandException("TEXT_COLUMNS contains 
illegal argument.");
-      }
-      for (int j = i + 1; j < textColumns.length; j++) {
-        if (textColumns[i].equals(textColumns[j])) {
-          throw new MalformedDataMapCommandException(
-              "TEXT_COLUMNS has duplicate columns :" + textColumns[i]);
-        }
-      }
-    }
-    List<String> indexedCarbonColumns = new ArrayList<>(textColumns.length);
-    for (int i = 0; i < textColumns.length; i++) {
-      CarbonColumn column = 
carbonTable.getColumnByName(carbonTable.getTableName(), textColumns[i]);
-      if (null == column) {
-        throw new MalformedDataMapCommandException("TEXT_COLUMNS: " + 
textColumns[i]
-            + " does not exist in table. Please check create DataMap 
statement.");
-      } else if (column.getDataType() != DataTypes.STRING) {
-        throw new MalformedDataMapCommandException(
-            "TEXT_COLUMNS only supports String column. " + "Unsupported 
column: " + textColumns[i]
-                + ", DataType: " + column.getDataType());
-      } else if (column.getEncoder().contains(Encoding.DICTIONARY)) {
-        throw new MalformedDataMapCommandException(
-            "TEXT_COLUMNS cannot contain dictionary column " + 
column.getColName());
-      }
-      indexedCarbonColumns.add(column.getColName());
-    }
-    return indexedCarbonColumns;
-  }
-
-  /**
    * this method will delete the datamap folders during drop datamap
    * @throws MalformedDataMapCommandException
    */
@@ -206,10 +146,16 @@ abstract class LuceneDataMapFactoryBase<T extends 
DataMap> implements DataMapFac
    * Return a new write for this datamap
    */
   @Override
-  public DataMapWriter createWriter(Segment segment, String 
writeDirectoryPath) {
-    LOGGER.info("lucene data write to " + writeDirectoryPath);
-    return new LuceneDataMapWriter(tableIdentifier, dataMapName, segment, 
writeDirectoryPath, true,
-        indexedCarbonColumns);
+  public DataMapWriter createWriter(Segment segment, String shardName) {
+    LOGGER.info("lucene data write to " + shardName);
+    return new LuceneDataMapWriter(getCarbonTable().getTablePath(), 
dataMapName,
+        dataMapMeta.getIndexedColumns(), segment, shardName, true);
+  }
+
+  @Override
+  public DataMapRefresher createRefresher(Segment segment, String shardName) {
+    return new LuceneDataMapRefresher(getCarbonTable().getTablePath(), 
dataMapName,
+        segment, shardName, dataMapMeta.getIndexedColumns());
   }
 
   /**
@@ -280,7 +226,7 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> 
implements DataMapFac
     try {
       // there can be multiple lucene datamaps present on a table, so get all 
datamaps and form
       // the path till the index file directories in all datamaps folders 
present in each segment
-      dataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable);
+      dataMaps = 
DataMapStoreManager.getInstance().getAllDataMap(getCarbonTable());
     } catch (IOException ex) {
       LOGGER.error("failed to get datamaps");
     }
@@ -302,4 +248,26 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> 
implements DataMapFac
     }
     return indexDirs.toArray(new CarbonFile[0]);
   }
+
+  /**
+   * Further validate whether it is string column and dictionary column.
+   * Currently only string and non-dictionary column is supported for Lucene 
DataMap
+   */
+  @Override
+  public void validate() throws MalformedDataMapCommandException {
+    super.validate();
+    List<CarbonColumn> indexColumns = 
getCarbonTable().getIndexedColumns(getDataMapSchema());
+
+    for (CarbonColumn column : indexColumns) {
+      if (column.getDataType() != DataTypes.STRING) {
+        throw new MalformedDataMapCommandException(String.format(
+            "Only String column is supported, column '%s' is %s type. ",
+            column.getColName(), column.getDataType()));
+      } else if (column.getEncoder().contains(Encoding.DICTIONARY)) {
+        throw new MalformedDataMapCommandException(String.format(
+            "Dictionary column is not supported, column '%s' is dictionary 
column",
+            column.getColName()));
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapRefresher.java
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapRefresher.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapRefresher.java
new file mode 100644
index 0000000..ee500ef
--- /dev/null
+++ 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapRefresher.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.DataMapRefresher;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat;
+import org.apache.lucene.codecs.lucene62.Lucene62Codec;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.DoublePoint;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.FloatPoint;
+import org.apache.lucene.document.IntPoint;
+import org.apache.lucene.document.IntRangeField;
+import org.apache.lucene.document.LongPoint;
+import org.apache.lucene.document.StoredField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.solr.store.hdfs.HdfsDirectory;
+
+public class LuceneDataMapRefresher implements DataMapRefresher {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(LuceneDataMapWriter.class.getName());
+
+  private String dataMapPath;
+
+  private List<CarbonColumn> indexColumns;
+
+  private int columnsCount;
+
+  private IndexWriter indexWriter = null;
+
+  private IndexWriter pageIndexWriter = null;
+
+  private Analyzer analyzer = null;
+
+  LuceneDataMapRefresher(String tablePath, String dataMapName,
+      Segment segment, String shardName, List<CarbonColumn> indexColumns) {
+    this.dataMapPath = CarbonTablePath.getDataMapStorePathOnShardName(
+        tablePath, segment.getSegmentNo(), dataMapName, shardName);
+    this.indexColumns = indexColumns;
+    this.columnsCount = indexColumns.size();
+  }
+
+  @Override
+  public void initialize() throws IOException {
+    // get index path, put index data into segment's path
+    Path indexPath = FileFactory.getPath(dataMapPath);
+    FileSystem fs = FileFactory.getFileSystem(indexPath);
+
+    // if index path exists, should delete it because we are
+    // rebuilding the whole datamap for all segments
+    if (fs.exists(indexPath)) {
+      fs.delete(indexPath, true);
+    }
+    if (!fs.mkdirs(indexPath)) {
+      LOGGER.error("Failed to create directory " + indexPath);
+    }
+
+    if (null == analyzer) {
+      analyzer = new StandardAnalyzer();
+    }
+
+    // create a index writer
+    Directory indexDir = new HdfsDirectory(indexPath, 
FileFactory.getConfiguration());
+
+    IndexWriterConfig indexWriterConfig = new IndexWriterConfig(analyzer);
+    if (CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE,
+            CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)
+        
.equalsIgnoreCase(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT))
 {
+      indexWriterConfig.setCodec(new 
Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_SPEED));
+    } else {
+      indexWriterConfig
+          .setCodec(new 
Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_COMPRESSION));
+    }
+
+    indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer));
+  }
+
+  private IndexWriter createPageIndexWriter() throws IOException {
+    // save index data into ram, write into disk after one page finished
+    RAMDirectory ramDir = new RAMDirectory();
+    return new IndexWriter(ramDir, new IndexWriterConfig(analyzer));
+  }
+
+  private void addPageIndex(IndexWriter pageIndexWriter) throws IOException {
+
+    Directory directory = pageIndexWriter.getDirectory();
+
+    // close ram writer
+    pageIndexWriter.close();
+
+    // add ram index data into disk
+    indexWriter.addIndexes(directory);
+
+    // delete this ram data
+    directory.close();
+  }
+
+  @Override
+  public void addRow(int blockletId, int pageId, int rowId, Object[] values) 
throws IOException {
+    if (rowId == 0) {
+      if (pageIndexWriter != null) {
+        addPageIndex(pageIndexWriter);
+      }
+      pageIndexWriter = createPageIndexWriter();
+    }
+
+    // create a new document
+    Document doc = new Document();
+
+    // add blocklet Id
+    doc.add(new IntPoint(LuceneDataMapWriter.BLOCKLETID_NAME, (int) 
values[columnsCount]));
+    doc.add(new StoredField(LuceneDataMapWriter.BLOCKLETID_NAME, (int) 
values[columnsCount]));
+
+    // add page id
+    doc.add(new IntPoint(LuceneDataMapWriter.PAGEID_NAME, (int) 
values[columnsCount + 1]));
+    doc.add(new StoredField(LuceneDataMapWriter.PAGEID_NAME, (int) 
values[columnsCount + 1]));
+
+    // add row id
+    doc.add(new IntPoint(LuceneDataMapWriter.ROWID_NAME, rowId));
+    doc.add(new StoredField(LuceneDataMapWriter.ROWID_NAME, rowId));
+
+    // add other fields
+    for (int colIdx = 0; colIdx < columnsCount; colIdx++) {
+      CarbonColumn column = indexColumns.get(colIdx);
+      addField(doc, column.getColName(), column.getDataType(), values[colIdx]);
+    }
+
+    pageIndexWriter.addDocument(doc);
+  }
+
+  private boolean addField(Document doc, String fieldName, DataType type, 
Object value) {
+    if (type == DataTypes.STRING) {
+      doc.add(new TextField(fieldName, (String) value, Field.Store.NO));
+    } else if (type == DataTypes.BYTE) {
+      // byte type , use int range to deal with byte, lucene has no byte type
+      IntRangeField field =
+          new IntRangeField(fieldName, new int[] { Byte.MIN_VALUE }, new int[] 
{ Byte.MAX_VALUE });
+      field.setIntValue((int) value);
+      doc.add(field);
+    } else if (type == DataTypes.SHORT) {
+      // short type , use int range to deal with short type, lucene has no 
short type
+      IntRangeField field = new IntRangeField(fieldName, new int[] { 
Short.MIN_VALUE },
+          new int[] { Short.MAX_VALUE });
+      field.setShortValue((short) value);
+      doc.add(field);
+    } else if (type == DataTypes.INT) {
+      // int type , use int point to deal with int type
+      doc.add(new IntPoint(fieldName, (int) value));
+    } else if (type == DataTypes.LONG) {
+      // long type , use long point to deal with long type
+      doc.add(new LongPoint(fieldName, (long) value));
+    } else if (type == DataTypes.FLOAT) {
+      doc.add(new FloatPoint(fieldName, (float) value));
+    } else if (type == DataTypes.DOUBLE) {
+      doc.add(new DoublePoint(fieldName, (double) value));
+    } else if (type == DataTypes.DATE) {
+      // TODO: how to get data value
+    } else if (type == DataTypes.TIMESTAMP) {
+      // TODO: how to get
+    } else if (type == DataTypes.BOOLEAN) {
+      IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new 
int[] { 1 });
+      field.setIntValue((boolean) value ? 1 : 0);
+      doc.add(field);
+    } else {
+      LOGGER.error("unsupport data type " + type);
+      throw new RuntimeException("unsupported data type " + type);
+    }
+    return true;
+  }
+
+  @Override
+  public void finish() throws IOException {
+    if (indexWriter != null && pageIndexWriter != null) {
+      addPageIndex(pageIndexWriter);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (indexWriter != null) {
+      indexWriter.close();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
index 95823bb..12fa1ef 100644
--- 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
+++ 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
@@ -17,7 +17,6 @@
 
 package org.apache.carbondata.datamap.lucene;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.List;
@@ -30,11 +29,10 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -76,12 +74,8 @@ public class LuceneDataMapWriter extends DataMapWriter {
 
   private Analyzer analyzer = null;
 
-  private String dataMapName = null;
-
   private boolean isFineGrain = true;
 
-  private List<String> indexedCarbonColumns = null;
-
   public static final String BLOCKLETID_NAME = "blockletId";
 
   private String indexShardName = null;
@@ -90,69 +84,54 @@ public class LuceneDataMapWriter extends DataMapWriter {
 
   public static final String ROWID_NAME = "rowId";
 
-  LuceneDataMapWriter(AbsoluteTableIdentifier identifier, String dataMapName, 
Segment segment,
-      String writeDirectoryPath, boolean isFineGrain, List<String> 
indexedCarbonColumns) {
-    super(identifier, segment, writeDirectoryPath);
-    this.dataMapName = dataMapName;
+  LuceneDataMapWriter(String tablePath, String dataMapName, List<CarbonColumn> 
indexColumns,
+      Segment segment, String shardName, boolean isFineGrain) {
+    super(tablePath, dataMapName, indexColumns, segment, shardName);
     this.isFineGrain = isFineGrain;
-    this.indexedCarbonColumns = indexedCarbonColumns;
-  }
-
-  private String getIndexPath(String taskName) {
-    if (isFineGrain) {
-      return genDataMapStorePathOnTaskId(identifier.getTablePath(), segmentId, 
dataMapName,
-          taskName);
-    } else {
-      // TODO: where write data in coarse grain data map
-      return genDataMapStorePathOnTaskId(identifier.getTablePath(), segmentId, 
dataMapName,
-          taskName);
-    }
   }
 
   /**
    * Start of new block notification.
    */
-  public void onBlockStart(String blockId, String indexShardName) throws 
IOException {
-    if (this.indexShardName == null || 
!this.indexShardName.equals(indexShardName)) {
-      if (indexWriter != null) {
-        return;
-      }
-      // get index path, put index data into segment's path
-      String strIndexPath = getIndexPath(indexShardName);
-      Path indexPath = FileFactory.getPath(strIndexPath);
-      FileSystem fs = FileFactory.getFileSystem(indexPath);
-
-      // if index path not exists, create it
-      if (!fs.exists(indexPath)) {
-        fs.mkdirs(indexPath);
+  public void onBlockStart(String blockId) throws IOException {
+    if (indexWriter != null) {
+      return;
+    }
+    // get index path, put index data into segment's path
+    Path indexPath = FileFactory.getPath(dataMapPath);
+    FileSystem fs = FileFactory.getFileSystem(indexPath);
+
+    // if index path not exists, create it
+    if (!fs.exists(indexPath)) {
+      if (!fs.mkdirs(indexPath)) {
+        throw new IOException("Failed to create directory " + dataMapPath);
       }
+    }
 
-      if (null == analyzer) {
-        analyzer = new StandardAnalyzer();
-      }
+    if (null == analyzer) {
+      analyzer = new StandardAnalyzer();
+    }
 
-      // the indexWriter closes the FileSystem on closing the writer, so for a 
new configuration
-      // and disable the cache for the index writer, it will be closed on 
closing the writer
-      Configuration conf = new Configuration();
-      conf.set("fs.hdfs.impl.disable.cache", "true");
-
-      // create a index writer
-      Directory indexDir = new HdfsDirectory(indexPath, conf);
-
-      IndexWriterConfig indexWriterConfig = new IndexWriterConfig(analyzer);
-      if (CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE,
-              CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)
-          
.equalsIgnoreCase(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT))
 {
-        indexWriterConfig.setCodec(new 
Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_SPEED));
-      } else {
-        indexWriterConfig
-            .setCodec(new 
Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_COMPRESSION));
-      }
+    // the indexWriter closes the FileSystem on closing the writer, so for a 
new configuration
+    // and disable the cache for the index writer, it will be closed on 
closing the writer
+    Configuration conf = new Configuration();
+    conf.set("fs.hdfs.impl.disable.cache", "true");
+
+    // create a index writer
+    Directory indexDir = new HdfsDirectory(indexPath, conf);
 
-      indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer));
+    IndexWriterConfig indexWriterConfig = new IndexWriterConfig(analyzer);
+    if (CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE,
+            CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)
+        
.equalsIgnoreCase(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT))
 {
+      indexWriterConfig.setCodec(new 
Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_SPEED));
+    } else {
+      indexWriterConfig
+          .setCodec(new 
Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_COMPRESSION));
     }
 
+    indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer));
   }
 
   /**
@@ -162,18 +141,30 @@ public class LuceneDataMapWriter extends DataMapWriter {
 
   }
 
+  private RAMDirectory ramDir;
+  private IndexWriter ramIndexWriter;
+
   /**
    * Start of new blocklet notification.
    */
-  public void onBlockletStart(int blockletId) {
-
+  public void onBlockletStart(int blockletId) throws IOException {
+    // save index data into ram, write into disk after one page finished
+    ramDir = new RAMDirectory();
+    ramIndexWriter = new IndexWriter(ramDir, new IndexWriterConfig(analyzer));
   }
 
   /**
    * End of blocklet notification
    */
-  public void onBlockletEnd(int blockletId) {
+  public void onBlockletEnd(int blockletId) throws IOException {
+    // close ram writer
+    ramIndexWriter.close();
+
+    // add ram index data into disk
+    indexWriter.addIndexes(ramDir);
 
+    // delete this ram data
+    ramDir.close();
   }
 
   /**
@@ -182,72 +173,54 @@ public class LuceneDataMapWriter extends DataMapWriter {
    * Implementation should copy the content of `pages` as needed, because 
`pages` memory
    * may be freed after this method returns, if using unsafe column page.
    */
-  public void onPageAdded(int blockletId, int pageId, ColumnPage[] pages) 
throws IOException {
-    // save index data into ram, write into disk after one page finished
-    RAMDirectory ramDir = new RAMDirectory();
-    IndexWriter ramIndexWriter = new IndexWriter(ramDir, new 
IndexWriterConfig(analyzer));
-
-    int columnsCount = pages.length;
-    if (columnsCount <= 0) {
-      LOGGER.warn("empty data");
-      ramIndexWriter.close();
-      ramDir.close();
-      return;
-    }
-
-    int pageSize = pages[0].getPageSize();
+  public void onPageAdded(int blockletId, int pageId, int pageSize, 
ColumnPage[] pages)
+      throws IOException {
     for (int rowId = 0; rowId < pageSize; rowId++) {
       // create a new document
       Document doc = new Document();
       // add blocklet Id
-      doc.add(new IntPoint(BLOCKLETID_NAME, new int[] { blockletId }));
+      doc.add(new IntPoint(BLOCKLETID_NAME, blockletId));
       doc.add(new StoredField(BLOCKLETID_NAME, blockletId));
       //doc.add(new NumericDocValuesField(BLOCKLETID_NAME,blockletId));
 
       // add page id and row id in Fine Grain data map
       if (isFineGrain) {
         // add page Id
-        doc.add(new IntPoint(PAGEID_NAME, new int[] { pageId }));
+        doc.add(new IntPoint(PAGEID_NAME, pageId));
         doc.add(new StoredField(PAGEID_NAME, pageId));
         //doc.add(new NumericDocValuesField(PAGEID_NAME,pageId));
 
         // add row id
-        doc.add(new IntPoint(ROWID_NAME, new int[] { rowId }));
+        doc.add(new IntPoint(ROWID_NAME, rowId));
         doc.add(new StoredField(ROWID_NAME, rowId));
         //doc.add(new NumericDocValuesField(ROWID_NAME,rowId));
       }
 
       // add indexed columns value into the document
-      for (ColumnPage page : pages) {
-        if (!page.getNullBits().get(rowId)) {
-          addField(doc, page, rowId, Field.Store.NO);
+      List<CarbonColumn> indexColumns = getIndexColumns();
+      for (int i = 0; i < pages.length; i++) {
+        // add to lucene only if value is not null
+        if (!pages[i].getNullBits().get(rowId)) {
+          addField(doc, pages[i].getData(rowId), indexColumns.get(i), 
Field.Store.NO);
         }
       }
 
       // add this document
       ramIndexWriter.addDocument(doc);
-
     }
-    // close ram writer
-    ramIndexWriter.close();
-
-    // add ram index data into disk
-    indexWriter.addIndexes(new Directory[] { ramDir });
 
-    // delete this ram data
-    ramDir.close();
   }
 
-  private boolean addField(Document doc, ColumnPage page, int rowId, 
Field.Store store) {
+  private boolean addField(Document doc, Object data, CarbonColumn column, 
Field.Store store) {
     //get field name
-    String fieldName = page.getColumnSpec().getFieldName();
+    String fieldName = column.getColName();
 
     //get field type
-    DataType type = page.getColumnSpec().getSchemaDataType();
+    DataType type = column.getDataType();
 
     if (type == DataTypes.BYTE) {
       // byte type , use int range to deal with byte, lucene has no byte type
-      byte value = page.getByte(rowId);
+      byte value = (byte) data;
       IntRangeField field =
           new IntRangeField(fieldName, new int[] { Byte.MIN_VALUE }, new int[] 
{ Byte.MAX_VALUE });
       field.setIntValue(value);
@@ -259,7 +232,7 @@ public class LuceneDataMapWriter extends DataMapWriter {
       }
     } else if (type == DataTypes.SHORT) {
       // short type , use int range to deal with short type, lucene has no 
short type
-      short value = page.getShort(rowId);
+      short value = (short) data;
       IntRangeField field = new IntRangeField(fieldName, new int[] { 
Short.MIN_VALUE },
           new int[] { Short.MAX_VALUE });
       field.setShortValue(value);
@@ -271,8 +244,8 @@ public class LuceneDataMapWriter extends DataMapWriter {
       }
     } else if (type == DataTypes.INT) {
       // int type , use int point to deal with int type
-      int value = page.getInt(rowId);
-      doc.add(new IntPoint(fieldName, new int[] { value }));
+      int value = (int) data;
+      doc.add(new IntPoint(fieldName, value));
 
       // if need store it , add StoredField
       if (store == Field.Store.YES) {
@@ -280,27 +253,27 @@ public class LuceneDataMapWriter extends DataMapWriter {
       }
     } else if (type == DataTypes.LONG) {
       // long type , use long point to deal with long type
-      long value = page.getLong(rowId);
-      doc.add(new LongPoint(fieldName, new long[] { value }));
+      long value = (long) data;
+      doc.add(new LongPoint(fieldName, value));
 
       // if need store it , add StoredField
       if (store == Field.Store.YES) {
         doc.add(new StoredField(fieldName, value));
       }
     } else if (type == DataTypes.FLOAT) {
-      float value = page.getFloat(rowId);
-      doc.add(new FloatPoint(fieldName, new float[] { value }));
+      float value = (float) data;
+      doc.add(new FloatPoint(fieldName, value));
       if (store == Field.Store.YES) {
         doc.add(new FloatPoint(fieldName, value));
       }
     } else if (type == DataTypes.DOUBLE) {
-      double value = page.getDouble(rowId);
-      doc.add(new DoublePoint(fieldName, new double[] { value }));
+      double value = (double) data;
+      doc.add(new DoublePoint(fieldName, value));
       if (store == Field.Store.YES) {
         doc.add(new DoublePoint(fieldName, value));
       }
     } else if (type == DataTypes.STRING) {
-      byte[] value = page.getBytes(rowId);
+      byte[] value = (byte[]) data;
       // TODO: how to get string value
       String strValue = null;
       try {
@@ -310,11 +283,11 @@ public class LuceneDataMapWriter extends DataMapWriter {
       }
       doc.add(new TextField(fieldName, strValue, store));
     } else if (type == DataTypes.DATE) {
-      // TODO: how to get data value
+      throw new RuntimeException("unsupported data type " + type);
     } else if (type == DataTypes.TIMESTAMP) {
-      // TODO: how to get
+      throw new RuntimeException("unsupported data type " + type);
     } else if (type == DataTypes.BOOLEAN) {
-      boolean value = page.getBoolean(rowId);
+      boolean value = (boolean) data;
       IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new 
int[] { 1 });
       field.setIntValue(value ? 1 : 0);
       doc.add(field);
@@ -339,23 +312,4 @@ public class LuceneDataMapWriter extends DataMapWriter {
     }
   }
 
-  /**
-   * Return store path for datamap
-   */
-  public static String genDataMapStorePath(String tablePath, String segmentId, 
String dataMapName) {
-    return CarbonTablePath.getSegmentPath(tablePath, segmentId) + 
File.separator + dataMapName;
-  }
-
-  /**
-   * Return store path for datamap based on the taskName,if three tasks get 
launched during loading,
-   * then three folders will be created based on the three task Ids and lucene 
index file will be
-   * written into those folders
-   *
-   * @return store path based on taskID
-   */
-  public static String genDataMapStorePathOnTaskId(String tablePath, String 
segmentId,
-      String dataMapName, String taskName) {
-    return CarbonTablePath.getSegmentPath(tablePath, segmentId) + 
File.separator + dataMapName
-        + File.separator + taskName;
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
index 1104d09..86fba32 100644
--- 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
+++ 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
@@ -87,7 +87,7 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap {
    */
   private Analyzer analyzer;
 
-  private String taskName;
+  private String filePath;
 
   LuceneFineGrainDataMap(Analyzer analyzer) {
     this.analyzer = analyzer;
@@ -102,7 +102,7 @@ public class LuceneFineGrainDataMap extends 
FineGrainDataMap {
 
     LOGGER.info("Lucene index read path " + indexPath.toString());
 
-    this.taskName = indexPath.getName();
+    this.filePath = indexPath.getName();
 
     // get file system , use hdfs file system , realized in solr project
     FileSystem fs = FileFactory.getFileSystem(indexPath);
@@ -277,7 +277,7 @@ public class LuceneFineGrainDataMap extends 
FineGrainDataMap {
       }
 
       // add a FineGrainBlocklet
-      blocklets.add(new FineGrainBlocklet(taskName, blockletId, pages));
+      blocklets.add(new FineGrainBlocklet(filePath, blockletId, pages));
     }
 
     return blocklets;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
index 1dae9b5..ec9283d 100644
--- 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
+++ 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
@@ -22,13 +22,17 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
+import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainDataMap;
 import org.apache.carbondata.core.features.TableOperation;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 
 /**
  * CG level of lucene DataMap
@@ -36,6 +40,11 @@ import org.apache.carbondata.core.memory.MemoryException;
 @InterfaceAudience.Internal
 public class LuceneFineGrainDataMapFactory extends 
LuceneDataMapFactoryBase<FineGrainDataMap> {
 
+  public LuceneFineGrainDataMapFactory(CarbonTable carbonTable, DataMapSchema 
dataMapSchema)
+      throws MalformedDataMapCommandException {
+    super(carbonTable, dataMapSchema);
+  }
+
   /**
    * Get the datamap for segmentid
    */
@@ -44,7 +53,7 @@ public class LuceneFineGrainDataMapFactory extends 
LuceneDataMapFactoryBase<Fine
     FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer);
     try {
       dataMap.init(new DataMapModel(
-          LuceneDataMapWriter.genDataMapStorePath(
+          DataMapWriter.getDefaultDataMapPath(
               tableIdentifier.getTablePath(), segment.getSegmentNo(), 
dataMapName)));
     } catch (MemoryException e) {
       LOGGER.error("failed to get lucene datamap , detail is {}" + 
e.getMessage());
@@ -74,7 +83,7 @@ public class LuceneFineGrainDataMapFactory extends 
LuceneDataMapFactoryBase<Fine
   }
 
   @Override
-  public DataMapLevel getDataMapType() {
+  public DataMapLevel getDataMapLevel() {
     return DataMapLevel.FG;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneIndexRefreshBuilder.java
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneIndexRefreshBuilder.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneIndexRefreshBuilder.java
deleted file mode 100644
index 2d5dcf8..0000000
--- 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneIndexRefreshBuilder.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.datamap.lucene;
-
-import java.io.IOException;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.util.CarbonProperties;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat;
-import org.apache.lucene.codecs.lucene62.Lucene62Codec;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.DoublePoint;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.FloatPoint;
-import org.apache.lucene.document.IntPoint;
-import org.apache.lucene.document.IntRangeField;
-import org.apache.lucene.document.LongPoint;
-import org.apache.lucene.document.StoredField;
-import org.apache.lucene.document.TextField;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.RAMDirectory;
-import org.apache.solr.store.hdfs.HdfsDirectory;
-
-public class LuceneIndexRefreshBuilder {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(LuceneDataMapWriter.class.getName());
-
-  private String strIndexPath;
-
-  private String[] indexColumns;
-  private DataType[] dataTypes;
-
-  private int columnsCount;
-
-  private IndexWriter indexWriter = null;
-
-  private IndexWriter pageIndexWriter = null;
-
-  private Analyzer analyzer = null;
-
-  public LuceneIndexRefreshBuilder(String strIndexPath, String[] indexColumns,
-      DataType[] dataTypes) {
-    this.strIndexPath = strIndexPath;
-    this.indexColumns = indexColumns;
-    this.columnsCount = indexColumns.length;
-    this.dataTypes = dataTypes;
-  }
-
-  public void initialize() throws IOException {
-    // get index path, put index data into segment's path
-    Path indexPath = FileFactory.getPath(strIndexPath);
-    FileSystem fs = FileFactory.getFileSystem(indexPath);
-
-    // if index path not exists, create it
-    if (!fs.exists(indexPath)) {
-      fs.mkdirs(indexPath);
-    }
-
-    if (null == analyzer) {
-      analyzer = new StandardAnalyzer();
-    }
-
-    // create a index writer
-    Directory indexDir = new HdfsDirectory(indexPath, 
FileFactory.getConfiguration());
-
-    IndexWriterConfig indexWriterConfig = new IndexWriterConfig(analyzer);
-    if (CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE,
-            CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)
-        
.equalsIgnoreCase(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT))
 {
-      indexWriterConfig.setCodec(new 
Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_SPEED));
-    } else {
-      indexWriterConfig
-          .setCodec(new 
Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_COMPRESSION));
-    }
-
-    indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer));
-  }
-
-  private IndexWriter createPageIndexWriter() throws IOException {
-    // save index data into ram, write into disk after one page finished
-    RAMDirectory ramDir = new RAMDirectory();
-    IndexWriter ramIndexWriter = new IndexWriter(ramDir, new 
IndexWriterConfig(analyzer));
-
-    return ramIndexWriter;
-  }
-
-  private void addPageIndex(IndexWriter pageIndexWriter) throws IOException {
-
-    Directory directory = pageIndexWriter.getDirectory();
-
-    // close ram writer
-    pageIndexWriter.close();
-
-    // add ram index data into disk
-    indexWriter.addIndexes(new Directory[] { directory });
-
-    // delete this ram data
-    directory.close();
-  }
-
-  public void addDocument(Object[] values) throws IOException {
-
-    if (values.length != indexColumns.length + 3) {
-      throw new IOException("The column number (" + values.length + ") of the 
row  is incorrect.");
-    }
-    int rowId = (int) values[indexColumns.length + 2];
-    if (rowId == 0) {
-      if (pageIndexWriter != null) {
-        addPageIndex(pageIndexWriter);
-      }
-      pageIndexWriter = createPageIndexWriter();
-    }
-
-    // create a new document
-    Document doc = new Document();
-
-    // add blocklet Id
-    doc.add(new IntPoint(LuceneDataMapWriter.BLOCKLETID_NAME,
-        new int[] { (int) values[columnsCount] }));
-    doc.add(new StoredField(LuceneDataMapWriter.BLOCKLETID_NAME, (int) 
values[columnsCount]));
-
-    // add page id
-    doc.add(new IntPoint(LuceneDataMapWriter.PAGEID_NAME,
-        new int[] { (int) values[columnsCount + 1] }));
-    doc.add(new StoredField(LuceneDataMapWriter.PAGEID_NAME, (int) 
values[columnsCount + 1]));
-
-    // add row id
-    doc.add(new IntPoint(LuceneDataMapWriter.ROWID_NAME, new int[] { rowId }));
-    doc.add(new StoredField(LuceneDataMapWriter.ROWID_NAME, rowId));
-
-    // add other fields
-    for (int colIdx = 0; colIdx < columnsCount; colIdx++) {
-      addField(doc, indexColumns[colIdx], dataTypes[colIdx], values[colIdx]);
-    }
-
-    pageIndexWriter.addDocument(doc);
-  }
-
-  private boolean addField(Document doc, String fieldName, DataType type, 
Object value) {
-    if (type == DataTypes.BYTE) {
-      // byte type , use int range to deal with byte, lucene has no byte type
-      IntRangeField field =
-          new IntRangeField(fieldName, new int[] { Byte.MIN_VALUE }, new int[] 
{ Byte.MAX_VALUE });
-      field.setIntValue((int) value);
-      doc.add(field);
-    } else if (type == DataTypes.SHORT) {
-      // short type , use int range to deal with short type, lucene has no 
short type
-      IntRangeField field = new IntRangeField(fieldName, new int[] { 
Short.MIN_VALUE },
-          new int[] { Short.MAX_VALUE });
-      field.setShortValue((short) value);
-      doc.add(field);
-    } else if (type == DataTypes.INT) {
-      // int type , use int point to deal with int type
-      doc.add(new IntPoint(fieldName, new int[] { (int) value }));
-    } else if (type == DataTypes.LONG) {
-      // long type , use long point to deal with long type
-      doc.add(new LongPoint(fieldName, new long[] { (long) value }));
-    } else if (type == DataTypes.FLOAT) {
-      doc.add(new FloatPoint(fieldName, new float[] { (float) value }));
-    } else if (type == DataTypes.DOUBLE) {
-      doc.add(new DoublePoint(fieldName, new double[] { (double) value }));
-    } else if (type == DataTypes.STRING) {
-      doc.add(new TextField(fieldName, (String) value, Field.Store.NO));
-    } else if (type == DataTypes.DATE) {
-      // TODO: how to get data value
-    } else if (type == DataTypes.TIMESTAMP) {
-      // TODO: how to get
-    } else if (type == DataTypes.BOOLEAN) {
-      IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new 
int[] { 1 });
-      field.setIntValue((boolean) value ? 1 : 0);
-      doc.add(field);
-    } else {
-      LOGGER.error("unsupport data type " + type);
-      throw new RuntimeException("unsupported data type " + type);
-    }
-    return true;
-  }
-
-  public void finish() throws IOException {
-    if (indexWriter != null && pageIndexWriter != null) {
-      addPageIndex(pageIndexWriter);
-    }
-  }
-
-  public void close() throws IOException {
-    if (indexWriter != null) {
-      indexWriter.close();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 7c0da5e..25cc228 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -491,8 +491,8 @@ m filterExpression
         if (blocklet.getSegmentId().equals(segment.getSegmentNo())) {
           found = true;
           // Set the pruned index file to the segment for further pruning.
-          String uniqueTaskName = 
CarbonTablePath.getUniqueTaskName(blocklet.getTaskName());
-          segment.setFilteredIndexShardName(uniqueTaskName);
+          String shardName = 
CarbonTablePath.getShardName(blocklet.getFilePath());
+          segment.setFilteredIndexShardName(shardName);
         }
       }
       // Add to remove segments list if not present in pruned blocklets.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/LuceneTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/LuceneTestCase.scala
 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/LuceneTestCase.scala
index 540c8ca..d111594 100644
--- 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/LuceneTestCase.scala
+++ 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/LuceneTestCase.scala
@@ -45,7 +45,7 @@ class LuceneTestCase extends QueryTest with BeforeAndAfterAll 
{
       s"""
          | CREATE DATAMAP lucene_datamap ON TABLE datamap_main
          | USING 'lucene'
-         | DMProperties('TEXT_COLUMNS'='country')
+         | DMProperties('INDEX_COLUMNS'='country')
       """.stripMargin)
     checkExistence(sql("show datamap on table datamap_main"), true, 
"lucene_datamap")
     sql("drop datamap if exists lucene_datamap on table datamap_main")
@@ -64,12 +64,11 @@ class LuceneTestCase extends QueryTest with 
BeforeAndAfterAll {
         s"""
            | CREATE DATAMAP lucene_datamap ON TABLE datamap_main
            | USING 'lucene'
-           | DMProperties('TEXT_COLUMNS'='id')
+           | DMProperties('INDEX_COLUMNS'='id')
       """.stripMargin)
     }
     assert(exception_otherdataType.getMessage
-      .contains("TEXT_COLUMNS only supports String column. Unsupported column: 
id, " +
-                "DataType: INT"))
+      .contains("Only String column is supported, column 'id' is INT type."))
   }
 
   //Create Lucene DataMap With DMProperties on MainTable and Load Data and 
Query
@@ -84,7 +83,7 @@ class LuceneTestCase extends QueryTest with BeforeAndAfterAll 
{
       s"""
          | CREATE DATAMAP lucene_datamap ON TABLE datamap_main
          | USING 'lucene'
-         | DMProperties('TEXT_COLUMNS'='country')
+         | DMProperties('INDEX_COLUMNS'='country')
       """.stripMargin)
     sql(s"LOAD DATA INPATH '$csvPath' INTO TABLE datamap_main")
 
@@ -105,7 +104,7 @@ class LuceneTestCase extends QueryTest with 
BeforeAndAfterAll {
       s"""
          | CREATE DATAMAP lucene_datamap ON TABLE datamap_main
          | USING 'lucene'
-         | DMProperties('TEXT_COLUMNS'='country,name,serialname')
+         | DMProperties('INDEX_COLUMNS'='country,name,serialname')
       """.stripMargin)
     sql(s"LOAD DATA LOCAL INPATH '$csvPath' INTO TABLE datamap_main")
     checkAnswer(sql("SELECT * FROM datamap_main WHERE 
TEXT_MATCH('country:ch*')"),
@@ -137,7 +136,7 @@ class LuceneTestCase extends QueryTest with 
BeforeAndAfterAll {
       s"""
          | CREATE DATAMAP lucene_datamap ON TABLE datamap_main
          | USING 'lucene'
-         | DMProperties('TEXT_COLUMNS'='country,name')
+         | DMProperties('INDEX_COLUMNS'='country,name')
       """.stripMargin)
     sql(s"LOAD DATA LOCAL INPATH '$csvPath' INTO TABLE datamap_main")
     sql(s"LOAD DATA LOCAL INPATH '$csvPath' INTO TABLE datamap_main")
@@ -163,7 +162,7 @@ class LuceneTestCase extends QueryTest with 
BeforeAndAfterAll {
       s"""
          | CREATE DATAMAP lucene_datamap ON TABLE datamap_main
          | USING 'lucene'
-         | DMProperties('TEXT_COLUMNS'='country')
+         | DMProperties('INDEX_COLUMNS'='country')
       """.stripMargin)
     sql(s"LOAD DATA LOCAL INPATH '$csvPath' INTO TABLE datamap_main 
OPTIONS('header'='false'," +
         
s"'BAD_RECORDS_LOGGER_ENABLE'='FALSE','BAD_RECORDS_ACTION'='FORCE','SINGLE_PASS'='TRUE')")
@@ -183,7 +182,7 @@ class LuceneTestCase extends QueryTest with 
BeforeAndAfterAll {
       s"""
          | CREATE DATAMAP lucene_datamap ON TABLE datamap_main
          | USING 'lucene'
-         | DMProperties('TEXT_COLUMNS'='country,name')
+         | DMProperties('INDEX_COLUMNS'='country,name')
       """.stripMargin)
     sql("insert into datamap_main select 1,'abc','aa'")
     sql("insert into datamap_main select 2,'def','ab'")
@@ -210,11 +209,11 @@ class LuceneTestCase extends QueryTest with 
BeforeAndAfterAll {
         s"""
            | CREATE DATAMAP lucene_datamap ON TABLE datamap_main
            | USING 'lucene'
-           | DMProperties('TEXT_COLUMNS'='country')
+           | DMProperties('INDEX_COLUMNS'='country')
       """.stripMargin)
     }
     assert(exception_dicitionaryinclude.getMessage
-      .contains("TEXT_COLUMNS cannot contain dictionary column country"))
+      .contains("Dictionary column is not supported, column 'country' is 
dictionary column"))
     sql("drop datamap if exists lucene_datamap on table datamap_main")
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
index 245d147..5e2534c 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
@@ -55,7 +55,7 @@ class LuceneCoarseGrainDataMapSuite extends QueryTest with 
BeforeAndAfterAll {
         s"""
            | CREATE DATAMAP dm ON TABLE datamap_test
            | USING 'lucene'
-           | DMProperties('TEXT_COLUMNS'='name,city')
+           | DMProperties('INDEX_COLUMNS'='name,city')
       """.stripMargin)
 
      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test 
OPTIONS('header'='false')")

Reply via email to