http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/partition/impl/HashPartitionerImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/partition/impl/HashPartitionerImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/partition/impl/HashPartitionerImpl.java
deleted file mode 100644
index a702a6b..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/partition/impl/HashPartitionerImpl.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.partition.impl;
-
-import java.util.List;
-
-import 
org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.partition.Partitioner;
-
-/**
- * Hash partitioner implementation
- */
-public class HashPartitionerImpl implements Partitioner<Object[]> {
-
-  private int numberOfBuckets;
-
-  private Hash[] hashes;
-
-  public HashPartitionerImpl(List<Integer> indexes, List<ColumnSchema> 
columnSchemas,
-      int numberOfBuckets) {
-    this.numberOfBuckets = numberOfBuckets;
-    hashes = new Hash[indexes.size()];
-    for (int i = 0; i < indexes.size(); i++) {
-      switch(columnSchemas.get(i).getDataType()) {
-        case SHORT:
-        case INT:
-        case LONG:
-          hashes[i] = new IntegralHash(indexes.get(i));
-          break;
-        case DOUBLE:
-        case FLOAT:
-        case DECIMAL:
-          hashes[i] = new DecimalHash(indexes.get(i));
-          break;
-        default:
-          hashes[i] = new StringHash(indexes.get(i));
-      }
-    }
-  }
-
-  @Override public int getPartition(Object[] objects) {
-    int hashCode = 0;
-    for (Hash hash : hashes) {
-      hashCode += hash.getHash(objects);
-    }
-    return (hashCode & Integer.MAX_VALUE) % numberOfBuckets;
-  }
-
-  private interface Hash {
-    int getHash(Object[] value);
-  }
-
-  private static class IntegralHash implements Hash {
-
-    private int index;
-
-    private IntegralHash(int index) {
-      this.index = index;
-    }
-
-    public int getHash(Object[] value) {
-      return value[index] != null ? 
Long.valueOf(value[index].toString()).hashCode() : 0;
-    }
-  }
-
-  private static class DecimalHash implements Hash {
-
-    private int index;
-
-    private DecimalHash(int index) {
-      this.index = index;
-    }
-
-    public int getHash(Object[] value) {
-      return value[index] != null ? 
Double.valueOf(value[index].toString()).hashCode() : 0;
-    }
-  }
-
-  private static class StringHash implements Hash {
-
-    private int index;
-
-    private StringHash(int index) {
-      this.index = index;
-    }
-
-    @Override public int getHash(Object[] value) {
-      return value[index] != null ? value[index].hashCode() : 0;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReader.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReader.java
 
b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReader.java
index 10f7029..db5f068 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReader.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReader.java
@@ -21,7 +21,7 @@ package org.apache.carbondata.core.reader;
 
 import java.io.IOException;
 
-import org.apache.carbondata.core.update.DeleteDeltaBlockDetails;
+import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java
index dac2d43..854a23b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java
@@ -19,16 +19,20 @@
 
 package org.apache.carbondata.core.reader;
 
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.StringWriter;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-import org.apache.carbondata.core.update.DeleteDeltaBlockDetails;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.fileoperations.AtomicFileOperationsImpl;
 
 import com.google.gson.Gson;
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
 
b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
index a0922ec..15c0cbc 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
@@ -24,14 +24,18 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-import org.apache.carbondata.core.update.DeleteDeltaBlockDetails;
-import org.apache.carbondata.core.update.DeleteDeltaBlockletDetails;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails;
+import org.apache.carbondata.core.mutate.DeleteDeltaBlockletDetails;
 import org.apache.carbondata.core.util.CarbonProperties;
 
 import org.apache.commons.lang.ArrayUtils;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryMetadataReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryMetadataReaderImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryMetadataReaderImpl.java
index c79c15d..21eb979 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryMetadataReaderImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryMetadataReaderImpl.java
@@ -23,12 +23,12 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.carbondata.common.factory.CarbonCommonFactory;
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.carbon.ColumnIdentifier;
-import org.apache.carbondata.core.carbon.path.CarbonTablePath;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnIdentifier;
+import org.apache.carbondata.core.service.CarbonCommonFactory;
 import org.apache.carbondata.core.service.PathService;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.ColumnDictionaryChunkMeta;
 
 import org.apache.thrift.TBase;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImpl.java
index 75aecf3..fe515d9 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImpl.java
@@ -25,12 +25,12 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.carbondata.common.factory.CarbonCommonFactory;
 import 
org.apache.carbondata.core.cache.dictionary.ColumnDictionaryChunkIterator;
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.carbon.ColumnIdentifier;
-import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnIdentifier;
+import org.apache.carbondata.core.service.CarbonCommonFactory;
 import org.apache.carbondata.core.service.PathService;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.ColumnDictionaryChunk;
 
 import org.apache.thrift.TBase;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java 
b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
index 81b7661..0e3a3e0 100644
--- a/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
@@ -22,7 +22,7 @@ package org.apache.carbondata.core.reader;
 import java.io.DataInputStream;
 import java.io.IOException;
 
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 
 import org.apache.thrift.TBase;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java
index 00ae688..e7da781 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java
@@ -21,18 +21,18 @@ package org.apache.carbondata.core.reader.sortindex;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.carbondata.common.factory.CarbonCommonFactory;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.carbon.ColumnIdentifier;
-import org.apache.carbondata.core.carbon.path.CarbonTablePath;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnIdentifier;
 import org.apache.carbondata.core.reader.CarbonDictionaryColumnMetaChunk;
 import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReader;
 import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReaderImpl;
 import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.service.CarbonCommonFactory;
 import org.apache.carbondata.core.service.PathService;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.ColumnSortInfo;
 
 import org.apache.thrift.TBase;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/collector/ScannedResultCollector.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/ScannedResultCollector.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/ScannedResultCollector.java
new file mode 100644
index 0000000..7ea0625
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/ScannedResultCollector.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.collector;
+
+import java.util.List;
+
+import org.apache.carbondata.core.scan.result.AbstractScannedResult;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+
+/**
+ * Interface which will be used to aggregate the scan result
+ */
+public interface ScannedResultCollector {
+
+  /**
+   * Below method will be used to aggregate the scanned result
+   *
+   * @param scannedResult scanned result
+   * @return how many records was aggregated
+   */
+  List<Object[]> collectData(AbstractScannedResult scannedResult, int 
batchSize);
+
+  /**
+   * Collects data in columnar format.
+   * @param scannedResult
+   * @param columnarBatch
+   */
+  void collectVectorBatch(AbstractScannedResult scannedResult, 
CarbonColumnarBatch columnarBatch);
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
new file mode 100644
index 0000000..9235c97
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.collector.impl;
+
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.scan.collector.ScannedResultCollector;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
+import org.apache.carbondata.core.scan.executor.util.QueryUtil;
+import org.apache.carbondata.core.scan.result.AbstractScannedResult;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
+
+/**
+ * It is not a collector it is just a scanned result holder.
+ */
+public abstract class AbstractScannedResultCollector implements 
ScannedResultCollector {
+
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(AbstractScannedResultCollector.class.getName());
+
+  /**
+   * restructuring info
+   */
+  private KeyStructureInfo restructureInfos;
+
+  /**
+   * table block execution infos
+   */
+  protected BlockExecutionInfo tableBlockExecutionInfos;
+
+  /**
+   * Measure ordinals
+   */
+  protected int[] measuresOrdinal;
+
+  /**
+   * to check whether measure exists in current table block or not this to
+   * handle restructuring scenario
+   */
+  protected boolean[] isMeasureExistsInCurrentBlock;
+
+  /**
+   * default value of the measures in case of restructuring some measure wont
+   * be present in the table so in that default value will be used to
+   * aggregate the data for that measure columns
+   */
+  private Object[] measureDefaultValue;
+
+  /**
+   * measure datatypes.
+   */
+  protected DataType[] measureDatatypes;
+
+  public AbstractScannedResultCollector(BlockExecutionInfo 
blockExecutionInfos) {
+    this.tableBlockExecutionInfos = blockExecutionInfos;
+    restructureInfos = blockExecutionInfos.getKeyStructureInfo();
+    measuresOrdinal = 
tableBlockExecutionInfos.getAggregatorInfo().getMeasureOrdinals();
+    isMeasureExistsInCurrentBlock = 
tableBlockExecutionInfos.getAggregatorInfo().getMeasureExists();
+    measureDefaultValue = 
tableBlockExecutionInfos.getAggregatorInfo().getDefaultValues();
+    this.measureDatatypes = 
tableBlockExecutionInfos.getAggregatorInfo().getMeasureDataTypes();
+  }
+
+  protected void fillMeasureData(Object[] msrValues, int offset,
+      AbstractScannedResult scannedResult) {
+    for (short i = 0; i < measuresOrdinal.length; i++) {
+      // if measure exists is block then pass measure column
+      // data chunk to the collector
+      if (isMeasureExistsInCurrentBlock[i]) {
+        msrValues[i + offset] = 
getMeasureData(scannedResult.getMeasureChunk(measuresOrdinal[i]),
+            scannedResult.getCurrenrRowId(), measureDatatypes[i]);
+      } else {
+        // if not then get the default value and use that value in aggregation
+        msrValues[i + offset] = measureDefaultValue[i];
+      }
+    }
+  }
+
+  private Object getMeasureData(MeasureColumnDataChunk dataChunk, int index, 
DataType dataType) {
+    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+      switch (dataType) {
+        case SHORT:
+        case INT:
+        case LONG:
+          return 
dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index);
+        case DECIMAL:
+          return org.apache.spark.sql.types.Decimal
+              
.apply(dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index));
+        default:
+          return 
dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Below method will used to get the result
+   */
+  protected void updateData(List<Object[]> listBasedResult) {
+    if (tableBlockExecutionInfos.isFixedKeyUpdateRequired()) {
+      updateKeyWithLatestBlockKeygenerator(listBasedResult);
+    }
+  }
+
+  /**
+   * Below method will be used to update the fixed length key with the
+   * latest block key generator
+   *
+   * @return updated block
+   */
+  private void updateKeyWithLatestBlockKeygenerator(List<Object[]> 
listBasedResult) {
+    try {
+      long[] data = null;
+      ByteArrayWrapper key = null;
+      for (int i = 0; i < listBasedResult.size(); i++) {
+        // get the key
+        key = (ByteArrayWrapper) listBasedResult.get(i)[0];
+        // unpack the key with table block key generator
+        data = tableBlockExecutionInfos.getBlockKeyGenerator()
+            .getKeyArray(key.getDictionaryKey(), 
tableBlockExecutionInfos.getMaskedByteForBlock());
+        // packed the key with latest block key generator
+        // and generate the masked key for that key
+        key.setDictionaryKey(QueryUtil
+            .getMaskedKey(restructureInfos.getKeyGenerator().generateKey(data),
+                restructureInfos.getMaxKey(), 
restructureInfos.getMaskByteRanges(),
+                restructureInfos.getMaskByteRanges().length));
+      }
+    } catch (KeyGenException e) {
+      LOGGER.error(e);
+    }
+  }
+
+  @Override public void collectVectorBatch(AbstractScannedResult scannedResult,
+      CarbonColumnarBatch columnarBatch) {
+    throw new UnsupportedOperationException("Works only for batch collectors");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
new file mode 100644
index 0000000..391f5da
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.collector.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import 
org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.scan.model.QueryDimension;
+import org.apache.carbondata.core.scan.model.QueryMeasure;
+import org.apache.carbondata.core.scan.result.AbstractScannedResult;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+import org.apache.commons.lang3.ArrayUtils;
+/**
+ * It is not a collector it is just a scanned result holder.
+ */
+public class DictionaryBasedResultCollector extends 
AbstractScannedResultCollector {
+
+  public DictionaryBasedResultCollector(BlockExecutionInfo 
blockExecutionInfos) {
+    super(blockExecutionInfos);
+  }
+
+  /**
+   * This method will add a record both key and value to list object
+   * it will keep track of how many record is processed, to handle limit 
scenario
+   */
+  @Override public List<Object[]> collectData(AbstractScannedResult 
scannedResult, int batchSize) {
+
+    List<Object[]> listBasedResult = new ArrayList<>(batchSize);
+    boolean isMsrsPresent = measureDatatypes.length > 0;
+
+    QueryDimension[] queryDimensions = 
tableBlockExecutionInfos.getQueryDimensions();
+    List<Integer> dictionaryIndexes = new ArrayList<Integer>();
+    for (int i = 0; i < queryDimensions.length; i++) {
+      if(queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) ||
+          
queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY) ) {
+        dictionaryIndexes.add(queryDimensions[i].getDimension().getOrdinal());
+      }
+    }
+    int[] primitive = ArrayUtils.toPrimitive(dictionaryIndexes.toArray(
+        new Integer[dictionaryIndexes.size()]));
+    Arrays.sort(primitive);
+    int[] actualIndexInSurrogateKey = new int[dictionaryIndexes.size()];
+    int index = 0;
+    for (int i = 0; i < queryDimensions.length; i++) {
+      if(queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) ||
+          
queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY) ) {
+        actualIndexInSurrogateKey[index++] = Arrays.binarySearch(primitive,
+            queryDimensions[i].getDimension().getOrdinal());
+      }
+    }
+
+    QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
+    BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache =
+        scannedResult.getDeleteDeltaDataCache();
+    Map<Integer, GenericQueryType> comlexDimensionInfoMap =
+        tableBlockExecutionInfos.getComlexDimensionInfoMap();
+    boolean[] dictionaryEncodingArray = 
CarbonUtil.getDictionaryEncodingArray(queryDimensions);
+    boolean[] directDictionaryEncodingArray =
+        CarbonUtil.getDirectDictionaryEncodingArray(queryDimensions);
+    boolean[] implictColumnArray = 
CarbonUtil.getImplicitColumnArray(queryDimensions);
+    boolean[] complexDataTypeArray = 
CarbonUtil.getComplexDataTypeArray(queryDimensions);
+    int dimSize = queryDimensions.length;
+    boolean isDimensionsExist = dimSize > 0;
+    int[] order = new int[dimSize + queryMeasures.length];
+    for (int i = 0; i < dimSize; i++) {
+      order[i] = queryDimensions[i].getQueryOrder();
+    }
+    for (int i = 0; i < queryMeasures.length; i++) {
+      order[i + dimSize] = queryMeasures[i].getQueryOrder();
+    }
+    // scan the record and add to list
+    int rowCounter = 0;
+    int dictionaryColumnIndex = 0;
+    int noDictionaryColumnIndex = 0;
+    int complexTypeColumnIndex = 0;
+    int[] surrogateResult;
+    String[] noDictionaryKeys;
+    byte[][] complexTypeKeyArray;
+    while (scannedResult.hasNext() && rowCounter < batchSize) {
+      Object[] row = new Object[dimSize + queryMeasures.length];
+      if (isDimensionsExist) {
+        surrogateResult = scannedResult.getDictionaryKeyIntegerArray();
+        noDictionaryKeys = scannedResult.getNoDictionaryKeyStringArray();
+        complexTypeKeyArray = scannedResult.getComplexTypeKeyArray();
+        dictionaryColumnIndex = 0;
+        noDictionaryColumnIndex = 0;
+        complexTypeColumnIndex = 0;
+        for (int i = 0; i < dimSize; i++) {
+          if (!dictionaryEncodingArray[i]) {
+            if (implictColumnArray[i]) {
+              if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID
+                  .equals(queryDimensions[i].getDimension().getColName())) {
+                row[order[i]] = DataTypeUtil.getDataBasedOnDataType(
+                    scannedResult.getBlockletId() + 
CarbonCommonConstants.FILE_SEPARATOR
+                        + scannedResult.getCurrenrRowId(), DataType.STRING);
+              } else {
+                row[order[i]] = DataTypeUtil
+                    .getDataBasedOnDataType(scannedResult.getBlockletId(), 
DataType.STRING);
+              }
+            } else {
+              row[order[i]] = DataTypeUtil
+                  
.getDataBasedOnDataType(noDictionaryKeys[noDictionaryColumnIndex++],
+                      queryDimensions[i].getDimension().getDataType());
+            }
+          } else if (directDictionaryEncodingArray[i]) {
+            DirectDictionaryGenerator directDictionaryGenerator =
+                DirectDictionaryKeyGeneratorFactory
+                    
.getDirectDictionaryGenerator(queryDimensions[i].getDimension().getDataType());
+            if (directDictionaryGenerator != null) {
+              row[order[i]] = directDictionaryGenerator.getValueFromSurrogate(
+                  
surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]]);
+            }
+          } else if (complexDataTypeArray[i]) {
+            row[order[i]] = comlexDimensionInfoMap
+                .get(queryDimensions[i].getDimension().getOrdinal())
+                .getDataBasedOnDataTypeFromSurrogates(
+                    
ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
+          } else {
+            row[order[i]] = 
surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]];
+          }
+        }
+
+      } else {
+        scannedResult.incrementCounter();
+      }
+      if (null != deleteDeltaDataCache && deleteDeltaDataCache
+          .contains(scannedResult.getCurrenrRowId())) {
+        continue;
+      }
+      if (isMsrsPresent) {
+        Object[] msrValues = new Object[measureDatatypes.length];
+        fillMeasureData(msrValues, 0, scannedResult);
+        for (int i = 0; i < msrValues.length; i++) {
+          row[order[i + dimSize]] = msrValues[i];
+        }
+      }
+      listBasedResult.add(row);
+      rowCounter++;
+    }
+    return listBasedResult;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
new file mode 100644
index 0000000..401dffd
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.collector.impl;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.model.QueryDimension;
+import org.apache.carbondata.core.scan.model.QueryMeasure;
+import org.apache.carbondata.core.scan.result.AbstractScannedResult;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import 
org.apache.carbondata.core.scan.result.vector.MeasureDataVectorProcessor;
+
+/**
+ * It is not a collector it is just a scanned result holder.
+ */
+public class DictionaryBasedVectorResultCollector extends 
AbstractScannedResultCollector {
+
+  private ColumnVectorInfo[] dictionaryInfo;
+
+  private ColumnVectorInfo[] noDictionaryInfo;
+
+  private ColumnVectorInfo[] complexInfo;
+
+  private ColumnVectorInfo[] measureInfo;
+
+  private ColumnVectorInfo[] allColumnInfo;
+
+  public DictionaryBasedVectorResultCollector(BlockExecutionInfo 
blockExecutionInfos) {
+    super(blockExecutionInfos);
+    QueryDimension[] queryDimensions = 
tableBlockExecutionInfos.getQueryDimensions();
+    QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
+    measureInfo = new ColumnVectorInfo[queryMeasures.length];
+    allColumnInfo = new ColumnVectorInfo[queryDimensions.length + 
queryMeasures.length];
+    List<ColumnVectorInfo> dictInfoList = new ArrayList<>();
+    List<ColumnVectorInfo> noDictInfoList = new ArrayList<>();
+    List<ColumnVectorInfo> complexList = new ArrayList<>();
+    for (int i = 0; i < queryDimensions.length; i++) {
+      if (!queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY)) 
{
+        ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
+        noDictInfoList.add(columnVectorInfo);
+        columnVectorInfo.dimension = queryDimensions[i];
+        columnVectorInfo.ordinal = 
queryDimensions[i].getDimension().getOrdinal();
+        allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo;
+      } else if 
(queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+        ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
+        dictInfoList.add(columnVectorInfo);
+        columnVectorInfo.dimension = queryDimensions[i];
+        columnVectorInfo.directDictionaryGenerator = 
DirectDictionaryKeyGeneratorFactory
+            
.getDirectDictionaryGenerator(queryDimensions[i].getDimension().getDataType());
+        columnVectorInfo.ordinal = 
queryDimensions[i].getDimension().getOrdinal();
+        allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo;
+      } else if (queryDimensions[i].getDimension().isComplex()) {
+        ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
+        complexList.add(columnVectorInfo);
+        columnVectorInfo.dimension = queryDimensions[i];
+        columnVectorInfo.ordinal = 
queryDimensions[i].getDimension().getOrdinal();
+        columnVectorInfo.genericQueryType =
+            
tableBlockExecutionInfos.getComlexDimensionInfoMap().get(columnVectorInfo.ordinal);
+        allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo;
+      } else {
+        ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
+        dictInfoList.add(columnVectorInfo);
+        columnVectorInfo.dimension = queryDimensions[i];
+        columnVectorInfo.ordinal = 
queryDimensions[i].getDimension().getOrdinal();
+        allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo;
+      }
+    }
+    for (int i = 0; i < queryMeasures.length; i++) {
+      ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
+      columnVectorInfo.measureVectorFiller = 
MeasureDataVectorProcessor.MeasureVectorFillerFactory
+          .getMeasureVectorFiller(queryMeasures[i].getMeasure().getDataType());
+      columnVectorInfo.ordinal = queryMeasures[i].getMeasure().getOrdinal();
+      columnVectorInfo.measure = queryMeasures[i];
+      measureInfo[i] = columnVectorInfo;
+      allColumnInfo[queryMeasures[i].getQueryOrder()] = columnVectorInfo;
+    }
+    dictionaryInfo = dictInfoList.toArray(new 
ColumnVectorInfo[dictInfoList.size()]);
+    noDictionaryInfo = noDictInfoList.toArray(new 
ColumnVectorInfo[noDictInfoList.size()]);
+    complexInfo = complexList.toArray(new 
ColumnVectorInfo[complexList.size()]);
+    Arrays.sort(dictionaryInfo);
+    Arrays.sort(noDictionaryInfo);
+    Arrays.sort(complexInfo);
+  }
+
+  @Override public List<Object[]> collectData(AbstractScannedResult 
scannedResult, int batchSize) {
+    throw new UnsupportedOperationException("collectData is not supported 
here");
+  }
+
+  @Override public void collectVectorBatch(AbstractScannedResult scannedResult,
+      CarbonColumnarBatch columnarBatch) {
+    int rowCounter = scannedResult.getRowCounter();
+    int availableRows = scannedResult.numberOfOutputRows() - rowCounter;
+    int requiredRows = columnarBatch.getBatchSize() - 
columnarBatch.getActualSize();
+    requiredRows = Math.min(requiredRows, availableRows);
+    if (requiredRows < 1) {
+      return;
+    }
+    for (int i = 0; i < allColumnInfo.length; i++) {
+      allColumnInfo[i].size = requiredRows;
+      allColumnInfo[i].offset = rowCounter;
+      allColumnInfo[i].vectorOffset = columnarBatch.getRowCounter();
+      allColumnInfo[i].vector = columnarBatch.columnVectors[i];
+    }
+
+    scannedResult.fillColumnarDictionaryBatch(dictionaryInfo);
+    scannedResult.fillColumnarNoDictionaryBatch(noDictionaryInfo);
+    scannedResult.fillColumnarMeasureBatch(measureInfo, measuresOrdinal);
+    scannedResult.fillColumnarComplexBatch(complexInfo);
+    scannedResult.setRowCounter(rowCounter + requiredRows);
+    columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows);
+    columnarBatch.setRowCounter(columnarBatch.getRowCounter() + requiredRows);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
new file mode 100644
index 0000000..a738402
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.collector.impl;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+import 
org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.model.QueryMeasure;
+import org.apache.carbondata.core.scan.result.AbstractScannedResult;
+import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
+
+
+/**
+ * It is not a collector it is just a scanned result holder.
+ */
+public class RawBasedResultCollector extends AbstractScannedResultCollector {
+
+  public RawBasedResultCollector(BlockExecutionInfo blockExecutionInfos) {
+    super(blockExecutionInfos);
+  }
+
+  /**
+   * This method will add a record both key and value to list object
+   * it will keep track of how many record is processed, to handle limit 
scenario
+   */
+  @Override public List<Object[]> collectData(AbstractScannedResult 
scannedResult, int batchSize) {
+    List<Object[]> listBasedResult = new ArrayList<>(batchSize);
+    QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
+    ByteArrayWrapper wrapper = null;
+    BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache =
+        scannedResult.getDeleteDeltaDataCache();
+    // scan the record and add to list
+    int rowCounter = 0;
+    while (scannedResult.hasNext() && rowCounter < batchSize) {
+      Object[] row = new Object[1 + queryMeasures.length];
+      wrapper = new ByteArrayWrapper();
+      wrapper.setDictionaryKey(scannedResult.getDictionaryKeyArray());
+      wrapper.setNoDictionaryKeys(scannedResult.getNoDictionaryKeyArray());
+      wrapper.setComplexTypesKeys(scannedResult.getComplexTypeKeyArray());
+      wrapper.setImplicitColumnByteArray(scannedResult.getBlockletId()
+          .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+      if (null != deleteDeltaDataCache && deleteDeltaDataCache
+          .contains(scannedResult.getCurrenrRowId())) {
+        continue;
+      }
+      row[0] = wrapper;
+      fillMeasureData(row, 1, scannedResult);
+      listBasedResult.add(row);
+      rowCounter++;
+    }
+    updateData(listBasedResult);
+    return listBasedResult;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
new file mode 100644
index 0000000..3061024
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.scan.complextypes;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+
+import org.apache.spark.sql.catalyst.util.*;
+import org.apache.spark.sql.types.*;
+
+public class ArrayQueryType extends ComplexQueryType implements 
GenericQueryType {
+
+  private GenericQueryType children;
+
+  public ArrayQueryType(String name, String parentname, int blockIndex) {
+    super(name, parentname, blockIndex);
+  }
+
+  @Override public void addChildren(GenericQueryType children) {
+    if (this.getName().equals(children.getParentname())) {
+      this.children = children;
+    } else {
+      this.children.addChildren(children);
+    }
+  }
+
+  @Override public String getName() {
+    return name;
+  }
+
+  @Override public void setName(String name) {
+    this.name = name;
+  }
+
+  @Override public String getParentname() {
+    return parentname;
+  }
+
+  @Override public void setParentname(String parentname) {
+    this.parentname = parentname;
+
+  }
+
+  public void parseBlocksAndReturnComplexColumnByteArray(
+      DimensionColumnDataChunk[] dimensionColumnDataChunks, int rowNumber,
+      DataOutputStream dataOutputStream) throws IOException {
+    byte[] input = new byte[8];
+    copyBlockDataChunk(dimensionColumnDataChunks, rowNumber, input);
+    ByteBuffer byteArray = ByteBuffer.wrap(input);
+    int dataLength = byteArray.getInt();
+    dataOutputStream.writeInt(dataLength);
+    if (dataLength > 0) {
+      int columnIndex = byteArray.getInt();
+      for (int i = 0; i < dataLength; i++) {
+        children
+            
.parseBlocksAndReturnComplexColumnByteArray(dimensionColumnDataChunks, 
columnIndex++,
+                dataOutputStream);
+      }
+    }
+  }
+
+  @Override public int getColsCount() {
+    return children.getColsCount() + 1;
+  }
+
+  @Override public DataType getSchemaType() {
+    return new ArrayType(null, true);
+  }
+
+  @Override public void fillRequiredBlockData(BlocksChunkHolder 
blockChunkHolder)
+      throws IOException {
+    readBlockDataChunk(blockChunkHolder);
+    children.fillRequiredBlockData(blockChunkHolder);
+  }
+
+  @Override public Object getDataBasedOnDataTypeFromSurrogates(ByteBuffer 
surrogateData) {
+    int dataLength = surrogateData.getInt();
+    if (dataLength == -1) {
+      return null;
+    }
+    Object[] data = new Object[dataLength];
+    for (int i = 0; i < dataLength; i++) {
+      data[i] = children.getDataBasedOnDataTypeFromSurrogates(surrogateData);
+    }
+    return new GenericArrayData(data);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java
new file mode 100644
index 0000000..75ed3ff
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.complextypes;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+
+public class ComplexQueryType {
+  protected GenericQueryType children;
+
+  protected String name;
+
+  protected String parentname;
+
+  protected int blockIndex;
+
+  public ComplexQueryType(String name, String parentname, int blockIndex) {
+    this.name = name;
+    this.parentname = parentname;
+    this.blockIndex = blockIndex;
+  }
+
+  /**
+   * Method will copy the block chunk holder data to the passed
+   * byte[], this method is also used by child
+   *
+   * @param rowNumber
+   * @param input
+   */
+  protected void copyBlockDataChunk(DimensionColumnDataChunk[] 
dimensionColumnDataChunks,
+      int rowNumber, byte[] input) {
+    byte[] data = 
dimensionColumnDataChunks[blockIndex].getChunkData(rowNumber);
+    System.arraycopy(data, 0, input, 0, data.length);
+  }
+
+  /*
+   * This method will read the block data chunk from the respective block
+   */
+  protected void readBlockDataChunk(BlocksChunkHolder blockChunkHolder) throws 
IOException {
+    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionDataChunk()[blockIndex] = 
blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
new file mode 100644
index 0000000..51e0252
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.scan.complextypes;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.keygenerator.mdkey.Bits;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+import org.apache.spark.sql.types.BooleanType$;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType$;
+import org.apache.spark.sql.types.DoubleType$;
+import org.apache.spark.sql.types.IntegerType$;
+import org.apache.spark.sql.types.LongType$;
+import org.apache.spark.sql.types.TimestampType$;
+
+public class PrimitiveQueryType extends ComplexQueryType implements 
GenericQueryType {
+
+  private String name;
+  private String parentname;
+
+  private int keySize;
+
+  private int blockIndex;
+
+  private Dictionary dictionary;
+
+  private org.apache.carbondata.core.metadata.datatype.DataType dataType;
+
+  private boolean isDirectDictionary;
+
+  public PrimitiveQueryType(String name, String parentname, int blockIndex,
+      org.apache.carbondata.core.metadata.datatype.DataType dataType, int 
keySize,
+      Dictionary dictionary, boolean isDirectDictionary) {
+    super(name, parentname, blockIndex);
+    this.dataType = dataType;
+    this.keySize = keySize;
+    this.dictionary = dictionary;
+    this.name = name;
+    this.parentname = parentname;
+    this.blockIndex = blockIndex;
+    this.isDirectDictionary = isDirectDictionary;
+  }
+
+  @Override public void addChildren(GenericQueryType children) {
+
+  }
+
+  @Override public String getName() {
+    return name;
+  }
+
+  @Override public void setName(String name) {
+    this.name = name;
+  }
+
+  @Override public String getParentname() {
+    return parentname;
+  }
+
+  @Override public void setParentname(String parentname) {
+    this.parentname = parentname;
+
+  }
+
+  @Override public int getColsCount() {
+    return 1;
+  }
+
+  @Override public void parseBlocksAndReturnComplexColumnByteArray(
+      DimensionColumnDataChunk[] dimensionDataChunks, int rowNumber,
+      DataOutputStream dataOutputStream) throws IOException {
+    byte[] currentVal =
+        new byte[dimensionDataChunks[blockIndex].getColumnValueSize()];
+    copyBlockDataChunk(dimensionDataChunks, rowNumber, currentVal);
+    dataOutputStream.write(currentVal);
+  }
+
+  @Override public DataType getSchemaType() {
+    switch (dataType) {
+      case INT:
+        return IntegerType$.MODULE$;
+      case DOUBLE:
+        return DoubleType$.MODULE$;
+      case LONG:
+        return LongType$.MODULE$;
+      case BOOLEAN:
+        return BooleanType$.MODULE$;
+      case TIMESTAMP:
+        return TimestampType$.MODULE$;
+      case DATE:
+        return DateType$.MODULE$;
+      default:
+        return IntegerType$.MODULE$;
+    }
+  }
+
+  @Override public void fillRequiredBlockData(BlocksChunkHolder 
blockChunkHolder)
+      throws IOException {
+    readBlockDataChunk(blockChunkHolder);
+  }
+
+  @Override public Object getDataBasedOnDataTypeFromSurrogates(ByteBuffer 
surrogateData) {
+    byte[] data = new byte[keySize];
+    surrogateData.get(data);
+    Bits bit = new Bits(new int[]{keySize * 8});
+    int surrgateValue = (int)bit.getKeyArray(data, 0)[0];
+    Object actualData = null;
+    if (isDirectDictionary) {
+      DirectDictionaryGenerator directDictionaryGenerator = 
DirectDictionaryKeyGeneratorFactory
+          .getDirectDictionaryGenerator(dataType);
+      actualData = 
directDictionaryGenerator.getValueFromSurrogate(surrgateValue);
+    } else {
+      String dictionaryValueForKey = 
dictionary.getDictionaryValueForKey(surrgateValue);
+      actualData = DataTypeUtil.getDataBasedOnDataType(dictionaryValueForKey, 
this.dataType);
+    }
+    return actualData;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
new file mode 100644
index 0000000..c0b6f1b
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.scan.complextypes;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+public class StructQueryType extends ComplexQueryType implements 
GenericQueryType {
+
+  private List<GenericQueryType> children = new ArrayList<GenericQueryType>();
+  private String name;
+  private String parentname;
+
+  public StructQueryType(String name, String parentname, int blockIndex) {
+    super(name, parentname, blockIndex);
+    this.name = name;
+    this.parentname = parentname;
+  }
+
+  @Override public void addChildren(GenericQueryType newChild) {
+    if (this.getName().equals(newChild.getParentname())) {
+      this.children.add(newChild);
+    } else {
+      for (GenericQueryType child : this.children) {
+        child.addChildren(newChild);
+      }
+    }
+
+  }
+
+  @Override public String getName() {
+    return name;
+  }
+
+  @Override public void setName(String name) {
+    this.name = name;
+  }
+
+  @Override public String getParentname() {
+    return parentname;
+  }
+
+  @Override public void setParentname(String parentname) {
+    this.parentname = parentname;
+
+  }
+
+  @Override public int getColsCount() {
+    int colsCount = 1;
+    for (int i = 0; i < children.size(); i++) {
+      colsCount += children.get(i).getColsCount();
+    }
+    return colsCount;
+  }
+
+  @Override public void parseBlocksAndReturnComplexColumnByteArray(
+      DimensionColumnDataChunk[] dimensionColumnDataChunks, int rowNumber,
+      DataOutputStream dataOutputStream) throws IOException {
+    byte[] input = new byte[8];
+    copyBlockDataChunk(dimensionColumnDataChunks, rowNumber, input);
+    ByteBuffer byteArray = ByteBuffer.wrap(input);
+    int childElement = byteArray.getInt();
+    dataOutputStream.writeInt(childElement);
+    if (childElement > 0){
+      for (int i = 0; i < childElement; i++) {
+        children.get(i)
+            
.parseBlocksAndReturnComplexColumnByteArray(dimensionColumnDataChunks, 
rowNumber,
+                dataOutputStream);
+      }
+    }
+  }
+
+  @Override public DataType getSchemaType() {
+    StructField[] fields = new StructField[children.size()];
+    for (int i = 0; i < children.size(); i++) {
+      fields[i] = new StructField(children.get(i).getName(), null, true,
+          Metadata.empty());
+    }
+    return new StructType(fields);
+  }
+
+  @Override public void fillRequiredBlockData(BlocksChunkHolder 
blockChunkHolder)
+      throws IOException {
+    readBlockDataChunk(blockChunkHolder);
+
+    for (int i = 0; i < children.size(); i++) {
+      children.get(i).fillRequiredBlockData(blockChunkHolder);
+    }
+  }
+
+  @Override public Object getDataBasedOnDataTypeFromSurrogates(ByteBuffer 
surrogateData) {
+    int childLength = surrogateData.getInt();
+    Object[] fields = new Object[childLength];
+    for (int i = 0; i < childLength; i++) {
+      fields[i] =  
children.get(i).getDataBasedOnDataTypeFromSurrogates(surrogateData);
+    }
+
+    return new GenericInternalRow(fields);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutor.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutor.java
new file mode 100644
index 0000000..7b324a2
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.executor;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.CarbonIterator;
+import 
org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.model.QueryModel;
+
+/**
+ * Interface for carbon query executor.
+ * Will be used to execute the query based on the query model
+ * and will return the iterator over query result
+ */
+public interface QueryExecutor<E> {
+
+  /**
+   * Below method will be used to execute the query based on query model 
passed from driver
+   *
+   * @param queryModel query details
+   * @return query result iterator
+   * @throws QueryExecutionException if any failure while executing the query
+   * @throws IOException if fail to read files
+   */
+  CarbonIterator<E> execute(QueryModel queryModel)
+      throws QueryExecutionException, IOException;
+
+  /**
+   * Below method will be used for cleanup
+   *
+   * @throws QueryExecutionException
+   */
+  void finish() throws QueryExecutionException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
new file mode 100644
index 0000000..293129b
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.executor;
+
+import org.apache.carbondata.core.scan.executor.impl.DetailQueryExecutor;
+import org.apache.carbondata.core.scan.executor.impl.VectorDetailQueryExecutor;
+import org.apache.carbondata.core.scan.model.QueryModel;
+
+/**
+ * Factory class to get the query executor from RDD
+ * This will return the executor based on query type
+ */
+public class QueryExecutorFactory {
+
+  public static QueryExecutor getQueryExecutor(QueryModel queryModel) {
+    if (queryModel.isVectorReader()) {
+      return new VectorDetailQueryExecutor();
+    } else {
+      return new DetailQueryExecutor();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/executor/exception/QueryExecutionException.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/exception/QueryExecutionException.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/exception/QueryExecutionException.java
new file mode 100644
index 0000000..0ac1f05
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/exception/QueryExecutionException.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.executor.exception;
+
+import java.util.Locale;
+
+/**
+ * Exception class for query execution
+ *
+ * @author Administrator
+ */
+public class QueryExecutionException extends Exception {
+
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param msg       The error message for this exception.
+   */
+  public QueryExecutionException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param msg       The error message for this exception.
+   */
+  public QueryExecutionException(String msg, Throwable t) {
+    super(msg, t);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param t
+   */
+  public QueryExecutionException(Throwable t) {
+    super(t);
+  }
+
+  /**
+   * This method is used to get the localized message.
+   *
+   * @param locale - A Locale object represents a specific geographical,
+   *               political, or cultural region.
+   * @return - Localized error message.
+   */
+  public String getLocalizedMessage(Locale locale) {
+    return "";
+  }
+
+  /**
+   * getLocalizedMessage
+   */
+  @Override public String getLocalizedMessage() {
+    return super.getLocalizedMessage();
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+
+}

Reply via email to