http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/LocalCarbonFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/LocalCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/LocalCarbonFile.java
new file mode 100644
index 0000000..d7866c1
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/LocalCarbonFile.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.filesystem;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.hadoop.fs.Path;
+
+public class LocalCarbonFile implements CarbonFile {
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(LocalCarbonFile.class.getName());
+  private File file;
+
+  public LocalCarbonFile(String filePath) {
+    Path pathWithoutSchemeAndAuthority = 
Path.getPathWithoutSchemeAndAuthority(new Path(filePath));
+    file = new File(pathWithoutSchemeAndAuthority.toString());
+  }
+
+  public LocalCarbonFile(File file) {
+    this.file = file;
+  }
+
+  @Override public String getAbsolutePath() {
+    return file.getAbsolutePath();
+  }
+
+  @Override public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
+    if (!file.isDirectory()) {
+      return null;
+    }
+
+    File[] files = file.listFiles(new FileFilter() {
+
+      @Override public boolean accept(File pathname) {
+        return fileFilter.accept(new LocalCarbonFile(pathname));
+      }
+    });
+
+    if (files == null) {
+      return new CarbonFile[0];
+    }
+
+    CarbonFile[] carbonFiles = new CarbonFile[files.length];
+
+    for (int i = 0; i < carbonFiles.length; i++) {
+      carbonFiles[i] = new LocalCarbonFile(files[i]);
+    }
+
+    return carbonFiles;
+  }
+
+  @Override public String getName() {
+    return file.getName();
+  }
+
+  @Override public boolean isDirectory() {
+    return file.isDirectory();
+  }
+
+  @Override public boolean exists() {
+    if (file != null) {
+      return file.exists();
+    }
+    return false;
+  }
+
+  @Override public String getCanonicalPath() {
+    try {
+      return file.getCanonicalPath();
+    } catch (IOException e) {
+      LOGGER
+          .error(e, "Exception occured" + e.getMessage());
+    }
+    return null;
+  }
+
+  @Override public CarbonFile getParentFile() {
+    return new LocalCarbonFile(file.getParentFile());
+  }
+
+  @Override public String getPath() {
+    return file.getPath();
+  }
+
+  @Override public long getSize() {
+    return file.length();
+  }
+
+  public boolean renameTo(String changetoName) {
+    return file.renameTo(new File(changetoName));
+  }
+
+  public boolean delete() {
+    return file.delete();
+  }
+
+  @Override public CarbonFile[] listFiles() {
+
+    if (!file.isDirectory()) {
+      return null;
+    }
+    File[] files = file.listFiles();
+    if (files == null) {
+      return new CarbonFile[0];
+    }
+    CarbonFile[] carbonFiles = new CarbonFile[files.length];
+    for (int i = 0; i < carbonFiles.length; i++) {
+      carbonFiles[i] = new LocalCarbonFile(files[i]);
+    }
+
+    return carbonFiles;
+
+  }
+
+  @Override public boolean createNewFile() {
+    try {
+      return file.createNewFile();
+    } catch (IOException e) {
+      return false;
+    }
+  }
+
+  @Override public long getLastModifiedTime() {
+    return file.lastModified();
+  }
+
+  @Override public boolean setLastModifiedTime(long timestamp) {
+    return file.setLastModified(timestamp);
+  }
+
+  /**
+   * This method will delete the data in file data from a given offset
+   */
+  @Override public boolean truncate(String fileName, long validDataEndOffset) {
+    FileChannel source = null;
+    FileChannel destination = null;
+    boolean fileTruncatedSuccessfully = false;
+    // temporary file name
+    String tempWriteFilePath = fileName + 
CarbonCommonConstants.TEMPWRITEFILEEXTENSION;
+    FileFactory.FileType fileType = FileFactory.getFileType(fileName);
+    try {
+      CarbonFile tempFile = null;
+      // delete temporary file if it already exists at a given path
+      if (FileFactory.isFileExist(tempWriteFilePath, fileType)) {
+        tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
+        tempFile.delete();
+      }
+      // create new temporary file
+      FileFactory.createNewFile(tempWriteFilePath, fileType);
+      tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
+      source = new FileInputStream(fileName).getChannel();
+      destination = new FileOutputStream(tempWriteFilePath).getChannel();
+      long read = destination.transferFrom(source, 0, validDataEndOffset);
+      long totalBytesRead = read;
+      long remaining = validDataEndOffset - totalBytesRead;
+      // read till required data offset is not reached
+      while (remaining > 0) {
+        read = destination.transferFrom(source, totalBytesRead, remaining);
+        totalBytesRead = totalBytesRead + read;
+        remaining = remaining - totalBytesRead;
+      }
+      CarbonUtil.closeStreams(source, destination);
+      // rename the temp file to original file
+      tempFile.renameForce(fileName);
+      fileTruncatedSuccessfully = true;
+    } catch (IOException e) {
+      LOGGER.error("Exception occured while truncating the file " + 
e.getMessage());
+    } finally {
+      CarbonUtil.closeStreams(source, destination);
+    }
+    return fileTruncatedSuccessfully;
+  }
+
+  /**
+   * This method will be used to check whether a file has been modified or not
+   *
+   * @param fileTimeStamp time to be compared with latest timestamp of file
+   * @param endOffset     file length to be compared with current length of 
file
+   * @return
+   */
+  @Override public boolean isFileModified(long fileTimeStamp, long endOffset) {
+    boolean isFileModified = false;
+    if (getLastModifiedTime() > fileTimeStamp || getSize() > endOffset) {
+      isFileModified = true;
+    }
+    return isFileModified;
+  }
+
+  @Override public boolean renameForce(String changetoName) {
+    File destFile = new File(changetoName);
+    if (destFile.exists()) {
+      if (destFile.delete()) {
+        return file.renameTo(new File(changetoName));
+      }
+    }
+
+    return file.renameTo(new File(changetoName));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/ViewFSCarbonFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/ViewFSCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/ViewFSCarbonFile.java
new file mode 100644
index 0000000..3fcf387
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/ViewFSCarbonFile.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastorage.filesystem;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastorage.impl.FileFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.viewfs.ViewFileSystem;
+
+public class ViewFSCarbonFile extends AbstractDFSCarbonFile {
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(ViewFSCarbonFile.class.getName());
+
+  public ViewFSCarbonFile(String filePath) {
+    super(filePath);
+  }
+
+  public ViewFSCarbonFile(Path path) {
+    super(path);
+  }
+
+  public ViewFSCarbonFile(FileStatus fileStatus) {
+    super(fileStatus);
+  }
+
+  /**
+   * @param listStatus
+   * @return
+   */
+  private CarbonFile[] getFiles(FileStatus[] listStatus) {
+    if (listStatus == null) {
+      return new CarbonFile[0];
+    }
+    CarbonFile[] files = new CarbonFile[listStatus.length];
+    for (int i = 0; i < files.length; i++) {
+      files[i] = new ViewFSCarbonFile(listStatus[i]);
+    }
+    return files;
+  }
+
+  @Override
+  public CarbonFile[] listFiles() {
+    FileStatus[] listStatus = null;
+    try {
+      if (null != fileStatus && fileStatus.isDirectory()) {
+        Path path = fileStatus.getPath();
+        listStatus = 
path.getFileSystem(FileFactory.getConfiguration()).listStatus(path);
+      } else {
+        return null;
+      }
+    } catch (IOException ex) {
+      LOGGER.error("Exception occured" + ex.getMessage());
+      return new CarbonFile[0];
+    }
+    return getFiles(listStatus);
+  }
+
+  @Override
+  public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
+    CarbonFile[] files = listFiles();
+    if (files != null && files.length >= 1) {
+      List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length);
+      for (int i = 0; i < files.length; i++) {
+        if (fileFilter.accept(files[i])) {
+          fileList.add(files[i]);
+        }
+      }
+      if (fileList.size() >= 1) {
+        return fileList.toArray(new CarbonFile[fileList.size()]);
+      } else {
+        return new CarbonFile[0];
+      }
+    }
+    return files;
+  }
+
+  @Override public CarbonFile getParentFile() {
+    Path parent = fileStatus.getPath().getParent();
+    return null == parent ? null : new ViewFSCarbonFile(parent);
+  }
+
+  @Override
+  public boolean renameForce(String changetoName) {
+    FileSystem fs;
+    try {
+      fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
+      if (fs instanceof ViewFileSystem) {
+        fs.delete(new Path(changetoName), true);
+        fs.rename(fileStatus.getPath(), new Path(changetoName));
+        return true;
+      } else {
+        return false;
+      }
+    } catch (IOException e) {
+      LOGGER.error("Exception occured" + e.getMessage());
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/impl/CompressedDataMeasureDataWrapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/impl/CompressedDataMeasureDataWrapper.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/impl/CompressedDataMeasureDataWrapper.java
new file mode 100644
index 0000000..6512d25
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/impl/CompressedDataMeasureDataWrapper.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.impl;
+
+import org.apache.carbondata.core.datastorage.dataholder.CarbonReadDataHolder;
+import org.apache.carbondata.core.datastorage.MeasureDataWrapper;
+
+public class CompressedDataMeasureDataWrapper implements MeasureDataWrapper {
+
+  private final CarbonReadDataHolder[] values;
+
+  public CompressedDataMeasureDataWrapper(final CarbonReadDataHolder[] values) 
{
+    this.values = values;
+  }
+
+  @Override public CarbonReadDataHolder[] getValues() {
+    return values;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/impl/DFSFileHolderImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/impl/DFSFileHolderImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/impl/DFSFileHolderImpl.java
new file mode 100644
index 0000000..443344f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/impl/DFSFileHolderImpl.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastorage.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.FileHolder;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+public class DFSFileHolderImpl implements FileHolder {
+  /**
+   * cache to hold filename and its stream
+   */
+  private Map<String, FSDataInputStream> fileNameAndStreamCache;
+
+  public DFSFileHolderImpl() {
+    this.fileNameAndStreamCache =
+        new HashMap<String, 
FSDataInputStream>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+  }
+
+  @Override public byte[] readByteArray(String filePath, long offset, int 
length)
+      throws IOException {
+    FSDataInputStream fileChannel = updateCache(filePath);
+    return read(fileChannel, length, offset);
+  }
+
+  /**
+   * This method will be used to check whether stream is already present in
+   * cache or not for filepath if not present then create it and then add to
+   * cache, other wise get from cache
+   *
+   * @param filePath fully qualified file path
+   * @return channel
+   */
+  private FSDataInputStream updateCache(String filePath) throws IOException {
+    FSDataInputStream fileChannel = fileNameAndStreamCache.get(filePath);
+    if (null == fileChannel) {
+      Path pt = new Path(filePath);
+      FileSystem fs = FileSystem.get(FileFactory.getConfiguration());
+      fileChannel = fs.open(pt);
+      fileNameAndStreamCache.put(filePath, fileChannel);
+    }
+    return fileChannel;
+  }
+
+  /**
+   * This method will be used to read from file based on number of bytes to be 
read and positon
+   *
+   * @param channel file channel
+   * @param size    number of bytes
+   * @param offset  position
+   * @return byte buffer
+   */
+  private byte[] read(FSDataInputStream channel, int size, long offset) throws 
IOException {
+    byte[] byteBffer = new byte[size];
+    channel.seek(offset);
+    channel.readFully(byteBffer);
+    return byteBffer;
+  }
+
+  /**
+   * This method will be used to read from file based on number of bytes to be 
read and positon
+   *
+   * @param channel file channel
+   * @param size    number of bytes
+   * @return byte buffer
+   */
+  private byte[] read(FSDataInputStream channel, int size) throws IOException {
+    byte[] byteBffer = new byte[size];
+    channel.readFully(byteBffer);
+    return byteBffer;
+  }
+
+  @Override public int readInt(String filePath, long offset) throws 
IOException {
+    FSDataInputStream fileChannel = updateCache(filePath);
+    fileChannel.seek(offset);
+    return fileChannel.readInt();
+  }
+
+  @Override public long readDouble(String filePath, long offset) throws 
IOException {
+    FSDataInputStream fileChannel = updateCache(filePath);
+    fileChannel.seek(offset);
+    return fileChannel.readLong();
+  }
+
+  @Override public void finish() throws IOException {
+    for (Entry<String, FSDataInputStream> entry : 
fileNameAndStreamCache.entrySet()) {
+      FSDataInputStream channel = entry.getValue();
+      if (null != channel) {
+        channel.close();
+      }
+    }
+  }
+
+  @Override public byte[] readByteArray(String filePath, int length) throws 
IOException {
+    FSDataInputStream fileChannel = updateCache(filePath);
+    return read(fileChannel, length);
+  }
+
+  @Override public long readLong(String filePath, long offset) throws 
IOException {
+    FSDataInputStream fileChannel = updateCache(filePath);
+    fileChannel.seek(offset);
+    return fileChannel.readLong();
+  }
+
+  @Override public int readInt(String filePath) throws IOException {
+    FSDataInputStream fileChannel = updateCache(filePath);
+    return fileChannel.readInt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/impl/FileFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/impl/FileFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/impl/FileFactory.java
new file mode 100644
index 0000000..ca9956d
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/impl/FileFactory.java
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.impl;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.carbondata.core.datastorage.FileHolder;
+import org.apache.carbondata.core.datastorage.filesystem.AlluxioCarbonFile;
+import org.apache.carbondata.core.datastorage.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastorage.filesystem.HDFSCarbonFile;
+import org.apache.carbondata.core.datastorage.filesystem.LocalCarbonFile;
+import org.apache.carbondata.core.datastorage.filesystem.ViewFSCarbonFile;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.GzipCodec;
+
+public final class FileFactory {
+  private static Configuration configuration = null;
+
+  static {
+    configuration = new Configuration();
+    configuration.addResource(new Path("../core-default.xml"));
+  }
+
+  private FileFactory() {
+
+  }
+
+  public static Configuration getConfiguration() {
+    return configuration;
+  }
+
+  public static FileHolder getFileHolder(FileType fileType) {
+    switch (fileType) {
+      case LOCAL:
+        return new FileHolderImpl();
+      case HDFS:
+      case ALLUXIO:
+      case VIEWFS:
+        return new DFSFileHolderImpl();
+      default:
+        return new FileHolderImpl();
+    }
+  }
+
+  public static FileType getFileType(String path) {
+    if (path.startsWith(CarbonUtil.HDFS_PREFIX)) {
+      return FileType.HDFS;
+    }
+    else if (path.startsWith(CarbonUtil.ALLUXIO_PREFIX)) {
+      return FileType.ALLUXIO;
+    }
+    else if (path.startsWith(CarbonUtil.VIEWFS_PREFIX)) {
+      return FileType.VIEWFS;
+    }
+    return FileType.LOCAL;
+  }
+
+  public static CarbonFile getCarbonFile(String path, FileType fileType) {
+    switch (fileType) {
+      case LOCAL:
+        return new LocalCarbonFile(path);
+      case HDFS:
+        return new HDFSCarbonFile(path);
+      case ALLUXIO:
+        return new AlluxioCarbonFile(path);
+      case VIEWFS:
+        return new ViewFSCarbonFile(path);
+      default:
+        return new LocalCarbonFile(path);
+    }
+  }
+
+  public static DataInputStream getDataInputStream(String path, FileType 
fileType)
+      throws IOException {
+    return getDataInputStream(path, fileType, -1);
+  }
+
+  public static DataInputStream getDataInputStream(String path, FileType 
fileType, int bufferSize)
+      throws IOException {
+    path = path.replace("\\", "/");
+    boolean gzip = path.endsWith(".gz");
+    boolean bzip2 = path.endsWith(".bz2");
+    InputStream stream;
+    switch (fileType) {
+      case LOCAL:
+        if (gzip) {
+          stream = new GZIPInputStream(new FileInputStream(path));
+        } else if (bzip2) {
+          stream = new BZip2CompressorInputStream(new FileInputStream(path));
+        } else {
+          stream = new FileInputStream(path);
+        }
+        break;
+      case HDFS:
+      case ALLUXIO:
+      case VIEWFS:
+        Path pt = new Path(path);
+        FileSystem fs = pt.getFileSystem(configuration);
+        if (bufferSize == -1) {
+          stream = fs.open(pt);
+        } else {
+          stream = fs.open(pt, bufferSize);
+        }
+        if (gzip) {
+          GzipCodec codec = new GzipCodec();
+          stream = codec.createInputStream(stream);
+        } else if (bzip2) {
+          BZip2Codec codec = new BZip2Codec();
+          stream = codec.createInputStream(stream);
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException("unsupported file system");
+    }
+    return new DataInputStream(new BufferedInputStream(stream));
+  }
+
+  /**
+   * return the datainputStream which is seek to the offset of file
+   *
+   * @param path
+   * @param fileType
+   * @param bufferSize
+   * @param offset
+   * @return DataInputStream
+   * @throws IOException
+   */
+  public static DataInputStream getDataInputStream(String path, FileType 
fileType, int bufferSize,
+      long offset) throws IOException {
+    path = path.replace("\\", "/");
+    switch (fileType) {
+      case HDFS:
+      case ALLUXIO:
+      case VIEWFS:
+        Path pt = new Path(path);
+        FileSystem fs = pt.getFileSystem(configuration);
+        FSDataInputStream stream = fs.open(pt, bufferSize);
+        stream.seek(offset);
+        return new DataInputStream(new BufferedInputStream(stream));
+      default:
+        FileInputStream fis = new FileInputStream(path);
+        long actualSkipSize = 0;
+        long skipSize = offset;
+        while (actualSkipSize != offset) {
+          actualSkipSize += fis.skip(skipSize);
+          skipSize = skipSize - actualSkipSize;
+        }
+        return new DataInputStream(new BufferedInputStream(fis));
+    }
+  }
+
+  public static DataOutputStream getDataOutputStream(String path, FileType 
fileType)
+      throws IOException {
+    path = path.replace("\\", "/");
+    switch (fileType) {
+      case LOCAL:
+        return new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(path)));
+      case HDFS:
+      case ALLUXIO:
+      case VIEWFS:
+        Path pt = new Path(path);
+        FileSystem fs = pt.getFileSystem(configuration);
+        FSDataOutputStream stream = fs.create(pt, true);
+        return stream;
+      default:
+        return new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(path)));
+    }
+  }
+
+  public static DataOutputStream getDataOutputStream(String path, FileType 
fileType, int bufferSize,
+      boolean append) throws IOException {
+    path = path.replace("\\", "/");
+    switch (fileType) {
+      case LOCAL:
+        return new DataOutputStream(
+            new BufferedOutputStream(new FileOutputStream(path, append), 
bufferSize));
+      case HDFS:
+      case ALLUXIO:
+      case VIEWFS:
+        Path pt = new Path(path);
+        FileSystem fs = pt.getFileSystem(configuration);
+        FSDataOutputStream stream = null;
+        if (append) {
+          // append to a file only if file already exists else file not found
+          // exception will be thrown by hdfs
+          if (CarbonUtil.isFileExists(path)) {
+            stream = fs.append(pt, bufferSize);
+          } else {
+            stream = fs.create(pt, true, bufferSize);
+          }
+        } else {
+          stream = fs.create(pt, true, bufferSize);
+        }
+        return stream;
+      default:
+        return new DataOutputStream(
+            new BufferedOutputStream(new FileOutputStream(path), bufferSize));
+    }
+  }
+
+  public static DataOutputStream getDataOutputStream(String path, FileType 
fileType, int bufferSize,
+      long blockSize) throws IOException {
+    path = path.replace("\\", "/");
+    switch (fileType) {
+      case LOCAL:
+        return new DataOutputStream(
+            new BufferedOutputStream(new FileOutputStream(path), bufferSize));
+      case HDFS:
+      case ALLUXIO:
+      case VIEWFS:
+        Path pt = new Path(path);
+        FileSystem fs = pt.getFileSystem(configuration);
+        FSDataOutputStream stream =
+            fs.create(pt, true, bufferSize, fs.getDefaultReplication(pt), 
blockSize);
+        return stream;
+      default:
+        return new DataOutputStream(
+            new BufferedOutputStream(new FileOutputStream(path), bufferSize));
+    }
+  }
+
+  /**
+   * This method checks the given path exists or not and also is it file or
+   * not if the performFileCheck is true
+   *
+   * @param filePath         - Path
+   * @param fileType         - FileType Local/HDFS
+   * @param performFileCheck - Provide false for folders, true for files and
+   */
+  public static boolean isFileExist(String filePath, FileType fileType, 
boolean performFileCheck)
+      throws IOException {
+    filePath = filePath.replace("\\", "/");
+    switch (fileType) {
+      case HDFS:
+      case ALLUXIO:
+      case VIEWFS:
+        Path path = new Path(filePath);
+        FileSystem fs = path.getFileSystem(configuration);
+        if (performFileCheck) {
+          return fs.exists(path) && fs.isFile(path);
+        } else {
+          return fs.exists(path);
+        }
+
+      case LOCAL:
+      default:
+        File defaultFile = new File(filePath);
+
+        if (performFileCheck) {
+          return defaultFile.exists() && defaultFile.isFile();
+        } else {
+          return defaultFile.exists();
+        }
+    }
+  }
+
+  /**
+   * This method checks the given path exists or not and also is it file or
+   * not if the performFileCheck is true
+   *
+   * @param filePath - Path
+   * @param fileType - FileType Local/HDFS
+   */
+  public static boolean isFileExist(String filePath, FileType fileType) throws 
IOException {
+    filePath = filePath.replace("\\", "/");
+    switch (fileType) {
+      case HDFS:
+      case ALLUXIO:
+      case VIEWFS:
+        Path path = new Path(filePath);
+        FileSystem fs = path.getFileSystem(configuration);
+        return fs.exists(path);
+
+      case LOCAL:
+      default:
+        File defaultFile = new File(filePath);
+        return defaultFile.exists();
+    }
+  }
+
+  public static boolean createNewFile(String filePath, FileType fileType) 
throws IOException {
+    filePath = filePath.replace("\\", "/");
+    switch (fileType) {
+      case HDFS:
+      case ALLUXIO:
+      case VIEWFS:
+        Path path = new Path(filePath);
+        FileSystem fs = path.getFileSystem(configuration);
+        return fs.createNewFile(path);
+
+      case LOCAL:
+      default:
+        File file = new File(filePath);
+        return file.createNewFile();
+    }
+  }
+
+  public static boolean deleteFile(String filePath, FileType fileType) throws 
IOException {
+    filePath = filePath.replace("\\", "/");
+    switch (fileType) {
+      case HDFS:
+      case ALLUXIO:
+      case VIEWFS:
+        Path path = new Path(filePath);
+        FileSystem fs = path.getFileSystem(configuration);
+        return fs.delete(path, true);
+
+      case LOCAL:
+      default:
+        File file = new File(filePath);
+        return deleteAllFilesOfDir(file);
+    }
+  }
+
+  public static boolean deleteAllFilesOfDir(File path) {
+    if (!path.exists()) {
+      return true;
+    }
+    if (path.isFile()) {
+      return path.delete();
+    }
+    File[] files = path.listFiles();
+    for (int i = 0; i < files.length; i++) {
+      deleteAllFilesOfDir(files[i]);
+    }
+    return path.delete();
+  }
+
+  public static boolean mkdirs(String filePath, FileType fileType) throws 
IOException {
+    filePath = filePath.replace("\\", "/");
+    switch (fileType) {
+      case HDFS:
+      case ALLUXIO:
+      case VIEWFS:
+        Path path = new Path(filePath);
+        FileSystem fs = path.getFileSystem(configuration);
+        return fs.mkdirs(path);
+      case LOCAL:
+      default:
+        File file = new File(filePath);
+        return file.mkdirs();
+    }
+  }
+
+  /**
+   * for getting the dataoutput stream using the hdfs filesystem append API.
+   *
+   * @param path
+   * @param fileType
+   * @return
+   * @throws IOException
+   */
+  public static DataOutputStream getDataOutputStreamUsingAppend(String path, 
FileType fileType)
+      throws IOException {
+    path = path.replace("\\", "/");
+    switch (fileType) {
+      case LOCAL:
+        return new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(path, true)));
+      case HDFS:
+      case ALLUXIO:
+      case VIEWFS:
+        Path pt = new Path(path);
+        FileSystem fs = pt.getFileSystem(configuration);
+        FSDataOutputStream stream = fs.append(pt);
+        return stream;
+      default:
+        return new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(path)));
+    }
+  }
+
+  /**
+   * for creating a new Lock file and if it is successfully created
+   * then in case of abrupt shutdown then the stream to that file will be 
closed.
+   *
+   * @param filePath
+   * @param fileType
+   * @return
+   * @throws IOException
+   */
+  public static boolean createNewLockFile(String filePath, FileType fileType) 
throws IOException {
+    filePath = filePath.replace("\\", "/");
+    switch (fileType) {
+      case HDFS:
+      case ALLUXIO:
+      case VIEWFS:
+        Path path = new Path(filePath);
+        FileSystem fs = path.getFileSystem(configuration);
+        if (fs.createNewFile(path)) {
+          fs.deleteOnExit(path);
+          return true;
+        }
+        return false;
+      case LOCAL:
+      default:
+        File file = new File(filePath);
+        return file.createNewFile();
+    }
+  }
+
+  public enum FileType {
+    LOCAL, HDFS, ALLUXIO, VIEWFS
+  }
+
+  /**
+   * below method will be used to update the file path
+   * for local type
+   * it removes the file:/ from the path
+   *
+   * @param filePath
+   * @return updated file path without url for local
+   */
+  public static String getUpdatedFilePath(String filePath) {
+    FileType fileType = getFileType(filePath);
+    switch (fileType) {
+      case HDFS:
+      case ALLUXIO:
+      case VIEWFS:
+        return filePath;
+      case LOCAL:
+      default:
+        Path pathWithoutSchemeAndAuthority =
+            Path.getPathWithoutSchemeAndAuthority(new Path(filePath));
+        return pathWithoutSchemeAndAuthority.toString();
+    }
+  }
+
+  /**
+   * It computes size of directory
+   *
+   * @param filePath
+   * @return size in bytes
+   * @throws IOException
+   */
+  public static long getDirectorySize(String filePath) throws IOException {
+    FileType fileType = getFileType(filePath);
+    switch (fileType) {
+      case HDFS:
+      case ALLUXIO:
+      case VIEWFS:
+        Path path = new Path(filePath);
+        FileSystem fs = path.getFileSystem(configuration);
+        return fs.getContentSummary(path).getLength();
+      case LOCAL:
+      default:
+        File file = new File(filePath);
+        return FileUtils.sizeOfDirectory(file);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/impl/FileHolderImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/impl/FileHolderImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/impl/FileHolderImpl.java
new file mode 100644
index 0000000..843ec5a
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/impl/FileHolderImpl.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.impl;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.FileHolder;
+
+public class FileHolderImpl implements FileHolder {
+  /**
+   * cache to hold filename and its stream
+   */
+  private Map<String, FileChannel> fileNameAndStreamCache;
+
+  /**
+   * FileHolderImpl Constructor
+   * It will create the cache
+   */
+  public FileHolderImpl() {
+    this.fileNameAndStreamCache =
+        new HashMap<String, 
FileChannel>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+  }
+
+  public FileHolderImpl(int capacity) {
+    this.fileNameAndStreamCache = new HashMap<String, FileChannel>(capacity);
+  }
+
+  /**
+   * This method will be used to read the byte array from file based on offset
+   * and length(number of bytes) need to read
+   *
+   * @param filePath fully qualified file path
+   * @param offset   reading start position,
+   * @param length   number of bytes to be read
+   * @return read byte array
+   */
+  @Override public byte[] readByteArray(String filePath, long offset, int 
length)
+      throws IOException {
+    FileChannel fileChannel = updateCache(filePath);
+    ByteBuffer byteBffer = read(fileChannel, length, offset);
+    return byteBffer.array();
+  }
+
+  /**
+   * This method will be used to close all the streams currently present in 
the cache
+   */
+  @Override public void finish() throws IOException {
+    for (Entry<String, FileChannel> entry : fileNameAndStreamCache.entrySet()) 
{
+      FileChannel channel = entry.getValue();
+      if (null != channel) {
+        channel.close();
+      }
+    }
+  }
+
+  /**
+   * This method will be used to read int from file from postion(offset), here
+   * length will be always 4 bacause int byte size if 4
+   *
+   * @param filePath fully qualified file path
+   * @param offset   reading start position,
+   * @return read int
+   */
+  @Override public int readInt(String filePath, long offset) throws 
IOException {
+    FileChannel fileChannel = updateCache(filePath);
+    ByteBuffer byteBffer = read(fileChannel, 
CarbonCommonConstants.INT_SIZE_IN_BYTE, offset);
+    return byteBffer.getInt();
+  }
+
+  /**
+   * This method will be used to read int from file from postion(offset), here
+   * length will be always 4 bacause int byte size if 4
+   *
+   * @param filePath fully qualified file path
+   * @return read int
+   */
+  @Override public int readInt(String filePath) throws IOException {
+    FileChannel fileChannel = updateCache(filePath);
+    ByteBuffer byteBffer = read(fileChannel, 
CarbonCommonConstants.INT_SIZE_IN_BYTE);
+    return byteBffer.getInt();
+  }
+
+  /**
+   * This method will be used to read int from file from postion(offset), here
+   * length will be always 4 bacause int byte size if 4
+   *
+   * @param filePath fully qualified file path
+   * @param offset   reading start position,
+   * @return read int
+   */
+  @Override public long readDouble(String filePath, long offset) throws 
IOException {
+    FileChannel fileChannel = updateCache(filePath);
+    ByteBuffer byteBffer = read(fileChannel, 
CarbonCommonConstants.LONG_SIZE_IN_BYTE, offset);
+    return byteBffer.getLong();
+  }
+
+  /**
+   * This method will be used to check whether stream is already present in
+   * cache or not for filepath if not present then create it and then add to
+   * cache, other wise get from cache
+   *
+   * @param filePath fully qualified file path
+   * @return channel
+   */
+  private FileChannel updateCache(String filePath) throws 
FileNotFoundException {
+    FileChannel fileChannel = fileNameAndStreamCache.get(filePath);
+    if (null == fileChannel) {
+      FileInputStream stream = new FileInputStream(filePath);
+      fileChannel = stream.getChannel();
+      fileNameAndStreamCache.put(filePath, fileChannel);
+    }
+    return fileChannel;
+  }
+
+  /**
+   * This method will be used to read from file based on number of bytes to be 
read and positon
+   *
+   * @param channel file channel
+   * @param size    number of bytes
+   * @param offset  position
+   * @return byte buffer
+   */
+  private ByteBuffer read(FileChannel channel, int size, long offset) throws 
IOException {
+    ByteBuffer byteBffer = ByteBuffer.allocate(size);
+    channel.position(offset);
+    channel.read(byteBffer);
+    byteBffer.rewind();
+    return byteBffer;
+  }
+
+  /**
+   * This method will be used to read from file based on number of bytes to be 
read and positon
+   *
+   * @param channel file channel
+   * @param size    number of bytes
+   * @return byte buffer
+   */
+  private ByteBuffer read(FileChannel channel, int size) throws IOException {
+    ByteBuffer byteBffer = ByteBuffer.allocate(size);
+    channel.read(byteBffer);
+    byteBffer.rewind();
+    return byteBffer;
+  }
+
+
+  /**
+   * This method will be used to read the byte array from file based on 
length(number of bytes)
+   *
+   * @param filePath fully qualified file path
+   * @param length   number of bytes to be read
+   * @return read byte array
+   */
+  @Override public byte[] readByteArray(String filePath, int length) throws 
IOException {
+    FileChannel fileChannel = updateCache(filePath);
+    ByteBuffer byteBffer = read(fileChannel, length);
+    return byteBffer.array();
+  }
+
+  /**
+   * This method will be used to read long from file from postion(offset), here
+   * length will be always 8 bacause int byte size is 8
+   *
+   * @param filePath fully qualified file path
+   * @param offset   reading start position,
+   * @return read long
+   */
+  @Override public long readLong(String filePath, long offset) throws 
IOException {
+    FileChannel fileChannel = updateCache(filePath);
+    ByteBuffer byteBffer = read(fileChannel, 
CarbonCommonConstants.LONG_SIZE_IN_BYTE, offset);
+    return byteBffer.getLong();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java
new file mode 100644
index 0000000..8573187
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.impl.data.compressed;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.NodeMeasureDataStore;
+import org.apache.carbondata.core.datastorage.compression.WriterCompressModel;
+import 
org.apache.carbondata.core.datastorage.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.datastorage.dataholder.CarbonWriteDataHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+
+public abstract class AbstractHeavyCompressedDoubleArrayDataStore
+    implements NodeMeasureDataStore //NodeMeasureDataStore<double[]>
+{
+
+  private LogService LOGGER =
+      
LogServiceFactory.getLogService(AbstractHeavyCompressedDoubleArrayDataStore.class.getName());
+
+  /**
+   * values.
+   */
+  protected ValueCompressionHolder[] values;
+
+  /**
+   * compressionModel.
+   */
+  protected WriterCompressModel compressionModel;
+
+  /**
+   * type
+   */
+  private char[] type;
+
+  /**
+   * AbstractHeavyCompressedDoubleArrayDataStore constructor.
+   *
+   * @param compressionModel
+   */
+  public AbstractHeavyCompressedDoubleArrayDataStore(WriterCompressModel 
compressionModel) {
+    this.compressionModel = compressionModel;
+    if (null != compressionModel) {
+      this.type = compressionModel.getType();
+      values =
+          new 
ValueCompressionHolder[compressionModel.getValueCompressionHolder().length];
+    }
+  }
+
+  // this method first invokes encoding routine to encode the data chunk,
+  // followed by invoking compression routine for preparing the data chunk for 
writing.
+  @Override public byte[][] 
getWritableMeasureDataArray(CarbonWriteDataHolder[] dataHolder) {
+    byte[][] returnValue = new byte[values.length][];
+    for (int i = 0; i < compressionModel.getValueCompressionHolder().length; 
i++) {
+      values[i] = compressionModel.getValueCompressionHolder()[i];
+      if (type[i] != CarbonCommonConstants.BYTE_VALUE_MEASURE) {
+        // first perform encoding of the data chunk
+        values[i].setValue(
+            
ValueCompressionUtil.getValueCompressor(compressionModel.getCompressionFinders()[i])
+                
.getCompressedValues(compressionModel.getCompressionFinders()[i], dataHolder[i],
+                    compressionModel.getMaxValue()[i],
+                    compressionModel.getMantissa()[i]));
+      } else {
+        values[i].setValue(dataHolder[i].getWritableByteArrayValues());
+      }
+      values[i].compress();
+      returnValue[i] = values[i].getCompressedData();
+    }
+
+    return returnValue;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java
new file mode 100644
index 0000000..ea42beb
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastorage.impl.data.compressed;
+
+import org.apache.carbondata.core.datastorage.compression.WriterCompressModel;
+
+public class HeavyCompressedDoubleArrayDataInMemoryStore
+    extends AbstractHeavyCompressedDoubleArrayDataStore {
+
+  public HeavyCompressedDoubleArrayDataInMemoryStore(WriterCompressModel 
compressionModel) {
+    super(compressionModel);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/store/FileHolder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/FileHolder.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/FileHolder.java
deleted file mode 100644
index 1f99158..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/FileHolder.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store;
-
-import java.io.IOException;
-
-public interface FileHolder {
-  /**
-   * This method will be used to read the byte array from file based on offset
-   * and length(number of bytes) need to read
-   *
-   * @param filePath fully qualified file path
-   * @param offset   reading start position,
-   * @param length   number of bytes to be read
-   * @return read byte array
-   */
-  byte[] readByteArray(String filePath, long offset, int length) throws 
IOException;
-
-  /**
-   * This method will be used to read the byte array from file based on 
length(number of bytes)
-   *
-   * @param filePath fully qualified file path
-   * @param length   number of bytes to be read
-   * @return read byte array
-   */
-  byte[] readByteArray(String filePath, int length) throws IOException;
-
-  /**
-   * This method will be used to read int from file from postion(offset), here
-   * length will be always 4 bacause int byte size if 4
-   *
-   * @param filePath fully qualified file path
-   * @param offset   reading start position,
-   * @return read int
-   */
-  int readInt(String filePath, long offset) throws IOException;
-
-  /**
-   * This method will be used to read long from file from postion(offset), here
-   * length will be always 8 bacause int byte size is 8
-   *
-   * @param filePath fully qualified file path
-   * @param offset   reading start position,
-   * @return read long
-   */
-  long readLong(String filePath, long offset) throws IOException;
-
-  /**
-   * This method will be used to read int from file from postion(offset), here
-   * length will be always 4 bacause int byte size if 4
-   *
-   * @param filePath fully qualified file path
-   * @return read int
-   */
-  int readInt(String filePath) throws IOException;
-
-  /**
-   * This method will be used to read long value from file from 
postion(offset), here
-   * length will be always 8 because long byte size if 4
-   *
-   * @param filePath fully qualified file path
-   * @param offset   reading start position,
-   * @return read long
-   */
-  long readDouble(String filePath, long offset) throws IOException;
-
-  /**
-   * This method will be used to close all the streams currently present in 
the cache
-   */
-  void finish() throws IOException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/store/MeasureDataWrapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/MeasureDataWrapper.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/MeasureDataWrapper.java
deleted file mode 100644
index 80a4374..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/MeasureDataWrapper.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store;
-
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
-
-/**
- * MeasureDataWrapper, interface.
- */
-public interface MeasureDataWrapper {
-  CarbonReadDataHolder[] getValues();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/store/NodeMeasureDataStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/NodeMeasureDataStore.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/NodeMeasureDataStore.java
deleted file mode 100644
index e40dadd..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/NodeMeasureDataStore.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store;
-
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonWriteDataHolder;
-
-public interface NodeMeasureDataStore {
-  /**
-   * This method will be used to get the writable key array.
-   * writable measure data array will hold below information:
-   * <size of measure data array><measure data array>
-   * total length will be 4 bytes for size + measure data array length
-   *
-   * @return writable array (compressed or normal)
-   */
-  byte[][] getWritableMeasureDataArray(CarbonWriteDataHolder[] 
dataHolderArray);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/BlockIndexerStorageForInt.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/BlockIndexerStorageForInt.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/BlockIndexerStorageForInt.java
deleted file mode 100644
index 013d873..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/BlockIndexerStorageForInt.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store.columnar;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.ByteUtil;
-
-public class BlockIndexerStorageForInt implements IndexStorage<int[]> {
-  private boolean alreadySorted;
-
-  private int[] dataAfterComp;
-
-  private int[] indexMap;
-
-  private byte[][] keyBlock;
-
-  private int[] dataIndexMap;
-
-  private int totalSize;
-
-  public BlockIndexerStorageForInt(byte[][] keyBlock, boolean compressData, 
boolean isNoDictionary,
-      boolean isSortRequired) {
-    ColumnWithIntIndex[] columnWithIndexs = 
createColumnWithIndexArray(keyBlock, isNoDictionary);
-    if (isSortRequired) {
-      Arrays.sort(columnWithIndexs);
-    }
-    compressMyOwnWay(extractDataAndReturnIndexes(columnWithIndexs, keyBlock));
-    if (compressData) {
-      compressDataMyOwnWay(columnWithIndexs);
-    }
-  }
-
-  /**
-   * Create an object with each column array and respective index
-   *
-   * @return
-   */
-  private ColumnWithIntIndex[] createColumnWithIndexArray(byte[][] keyBlock,
-      boolean isNoDictionary) {
-    ColumnWithIntIndex[] columnWithIndexs;
-    if (isNoDictionary) {
-      columnWithIndexs = new ColumnWithIntIndexForHighCard[keyBlock.length];
-      for (int i = 0; i < columnWithIndexs.length; i++) {
-        columnWithIndexs[i] = new ColumnWithIntIndexForHighCard(keyBlock[i], 
i);
-      }
-
-    } else {
-      columnWithIndexs = new ColumnWithIntIndex[keyBlock.length];
-      for (int i = 0; i < columnWithIndexs.length; i++) {
-        columnWithIndexs[i] = new ColumnWithIntIndex(keyBlock[i], i);
-      }
-    }
-
-    return columnWithIndexs;
-  }
-
-  private int[] extractDataAndReturnIndexes(ColumnWithIntIndex[] 
columnWithIndexs,
-      byte[][] keyBlock) {
-    int[] indexes = new int[columnWithIndexs.length];
-    for (int i = 0; i < indexes.length; i++) {
-      indexes[i] = columnWithIndexs[i].getIndex();
-      keyBlock[i] = columnWithIndexs[i].getColumn();
-    }
-    this.keyBlock = keyBlock;
-    return indexes;
-  }
-
-  /**
-   * It compresses depends up on the sequence numbers.
-   * [1,2,3,4,6,8,10,11,12,13] is translated to [1,4,6,8,10,13] and [0,6]. In
-   * first array the start and end of sequential numbers and second array
-   * keeps the indexes of where sequential numbers starts. If there is no
-   * sequential numbers then the same array it returns with empty second
-   * array.
-   *
-   * @param indexes
-   */
-  public void compressMyOwnWay(int[] indexes) {
-    List<Integer> list = new 
ArrayList<Integer>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    List<Integer> map = new 
ArrayList<Integer>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    int k = 0;
-    int i = 1;
-    for (; i < indexes.length; i++) {
-      if (indexes[i] - indexes[i - 1] == 1) {
-        k++;
-      } else {
-        if (k > 0) {
-          map.add((list.size()));
-          list.add(indexes[i - k - 1]);
-          list.add(indexes[i - 1]);
-        } else {
-          list.add(indexes[i - 1]);
-        }
-        k = 0;
-      }
-    }
-    if (k > 0) {
-      map.add((list.size()));
-      list.add(indexes[i - k - 1]);
-      list.add(indexes[i - 1]);
-    } else {
-      list.add(indexes[i - 1]);
-    }
-    dataAfterComp = convertToArray(list);
-    if (indexes.length == dataAfterComp.length) {
-      indexMap = new int[0];
-    } else {
-      indexMap = convertToArray(map);
-    }
-    if (dataAfterComp.length == 2 && indexMap.length == 1) {
-      alreadySorted = true;
-    }
-  }
-
-  private int[] convertToArray(List<Integer> list) {
-    int[] shortArray = new int[list.size()];
-    for (int i = 0; i < shortArray.length; i++) {
-      shortArray[i] = list.get(i);
-    }
-    return shortArray;
-  }
-
-  /**
-   * @return the alreadySorted
-   */
-  public boolean isAlreadySorted() {
-    return alreadySorted;
-  }
-
-  /**
-   * @return the dataAfterComp
-   */
-  public int[] getDataAfterComp() {
-    return dataAfterComp;
-  }
-
-  /**
-   * @return the indexMap
-   */
-  public int[] getIndexMap() {
-    return indexMap;
-  }
-
-  /**
-   * @return the keyBlock
-   */
-  public byte[][] getKeyBlock() {
-    return keyBlock;
-  }
-
-  private void compressDataMyOwnWay(ColumnWithIntIndex[] indexes) {
-    byte[] prvKey = indexes[0].getColumn();
-    List<ColumnWithIntIndex> list =
-        new 
ArrayList<ColumnWithIntIndex>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    list.add(indexes[0]);
-    int counter = 1;
-    int start = 0;
-    List<Integer> map = new 
ArrayList<Integer>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    for (int i = 1; i < indexes.length; i++) {
-      if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(prvKey, 
indexes[i].getColumn()) != 0) {
-        prvKey = indexes[i].getColumn();
-        list.add(indexes[i]);
-        map.add(start);
-        map.add(counter);
-        start += counter;
-        counter = 1;
-        continue;
-      }
-      counter++;
-    }
-    map.add(start);
-    map.add(counter);
-    this.keyBlock = convertToKeyArray(list);
-    if (indexes.length == keyBlock.length) {
-      dataIndexMap = new int[0];
-    } else {
-      dataIndexMap = convertToArray(map);
-    }
-  }
-
-  private byte[][] convertToKeyArray(List<ColumnWithIntIndex> list) {
-    byte[][] shortArray = new byte[list.size()][];
-    for (int i = 0; i < shortArray.length; i++) {
-      shortArray[i] = list.get(i).getColumn();
-      totalSize += shortArray[i].length;
-    }
-    return shortArray;
-  }
-
-  @Override public int[] getDataIndexMap() {
-    return dataIndexMap;
-  }
-
-  @Override public int getTotalSize() {
-    return totalSize;
-  }
-
-  @Override public byte[] getMin() {
-    return keyBlock[0];
-  }
-
-  @Override public byte[] getMax() {
-    return keyBlock[keyBlock.length - 1];
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/BlockIndexerStorageForNoInvertedIndex.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/BlockIndexerStorageForNoInvertedIndex.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/BlockIndexerStorageForNoInvertedIndex.java
deleted file mode 100644
index c7d43cf..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/BlockIndexerStorageForNoInvertedIndex.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store.columnar;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.ByteUtil;
-
-public class BlockIndexerStorageForNoInvertedIndex implements 
IndexStorage<int[]> {
-  private byte[][] keyBlock;
-  private byte[][] sortedBlock;
-  private int totalSize;
-  private int[] dataIndexMap;
-
-  public BlockIndexerStorageForNoInvertedIndex(byte[][] keyBlockInput, boolean 
compressData,
-      boolean isNoDictionary) {
-    // without invertedindex but can be RLE
-    if (compressData) {
-      // with RLE
-      byte[] prvKey = keyBlockInput[0];
-      List<byte[]> list = new 
ArrayList<byte[]>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-      list.add(keyBlockInput[0]);
-      int counter = 1;
-      int start = 0;
-      List<Integer> map = new 
ArrayList<Integer>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-      int length = keyBlockInput.length;
-      for(int i = 1; i < length; i++) {
-        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(prvKey, 
keyBlockInput[i]) != 0) {
-          prvKey = keyBlockInput[i];
-          list.add(keyBlockInput[i]);
-          map.add(start);
-          map.add(counter);
-          start += counter;
-          counter = 1;
-          continue;
-        }
-        counter++;
-      }
-      map.add(start);
-      map.add(counter);
-      this.keyBlock = convertToKeyArray(list);
-      if (keyBlockInput.length == this.keyBlock.length) {
-        dataIndexMap = new int[0];
-      } else {
-        dataIndexMap = convertToArray(map);
-      }
-    } else {
-      this.keyBlock = keyBlockInput;
-      dataIndexMap = new int[0];
-    }
-
-    this.sortedBlock = new byte[keyBlock.length][];
-    System.arraycopy(keyBlock, 0, sortedBlock, 0, keyBlock.length);
-    if (isNoDictionary) {
-      Arrays.sort(sortedBlock, new Comparator<byte[]>() {
-        @Override
-        public int compare(byte[] col1, byte[] col2) {
-          return ByteUtil.UnsafeComparer.INSTANCE
-              .compareTo(col1, 2, col1.length - 2, col2, 2, col2.length - 2);
-        }
-      });
-    } else {
-      Arrays.sort(sortedBlock, new Comparator<byte[]>() {
-        @Override
-        public int compare(byte[] col1, byte[] col2) {
-          return ByteUtil.UnsafeComparer.INSTANCE.compareTo(col1, col2);
-        }
-      });
-    }
-
-  }
-
-  private int[] convertToArray(List<Integer> list) {
-    int[] shortArray = new int[list.size()];
-    for(int i = 0; i < shortArray.length; i++) {
-      shortArray[i] = list.get(i);
-    }
-    return shortArray;
-  }
-
-  private byte[][] convertToKeyArray(List<byte[]> list) {
-    byte[][] shortArray = new byte[list.size()][];
-    for (int i = 0; i < shortArray.length; i++) {
-      shortArray[i] = list.get(i);
-      totalSize += shortArray[i].length;
-    }
-    return shortArray;
-  }
-
-  @Override
-  public int[] getDataIndexMap() {
-    return dataIndexMap;
-  }
-
-  @Override
-  public int getTotalSize() {
-    return totalSize;
-  }
-
-  @Override
-  public boolean isAlreadySorted() {
-    return true;
-  }
-
-  /**
-   * no use
-   * @return
-   */
-  @Override
-  public int[] getDataAfterComp() {
-    return new int[0];
-  }
-
-  /**
-   * no use
-   * @return
-   */
-  @Override
-  public int[] getIndexMap() {
-    return new int[0];
-  }
-
-  /**
-   * @return the keyBlock
-   */
-  public byte[][] getKeyBlock() {
-    return keyBlock;
-  }
-
-  @Override public byte[] getMin() {
-    return sortedBlock[0];
-  }
-
-  @Override public byte[] getMax() {
-    return sortedBlock[sortedBlock.length - 1];
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/ColumnGroupModel.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/ColumnGroupModel.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/ColumnGroupModel.java
deleted file mode 100644
index cf9ba40..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/ColumnGroupModel.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.core.datastorage.store.columnar;
-
-public class ColumnGroupModel {
-
-  /**
-   * number of columns in columnar block
-   */
-  private int[] columnSplit;
-
-  /**
-   * total number of columns
-   */
-  private int noOfColumnsStore;
-
-  /**
-   * whether given index is columnar or not
-   * true: columnar
-   * false: row block
-   */
-  private boolean[] columnarStore;
-
-  /**
-   * column groups
-   * e.g
-   * {{0,1,2},3,4,{5,6}}
-   */
-  private int[][] columnGroups;
-
-  /**
-   * return columnSplit
-   *
-   * @return
-   */
-  public int[] getColumnSplit() {
-    return columnSplit;
-  }
-
-  /**
-   * set columnSplit
-   *
-   * @param split
-   */
-  public void setColumnSplit(int[] split) {
-    this.columnSplit = split;
-  }
-
-  /**
-   * @return no of columnar block
-   */
-  public int getNoOfColumnStore() {
-    return this.noOfColumnsStore;
-  }
-
-  /**
-   * set no of columnar block
-   *
-   * @param noOfColumnsStore
-   */
-  public void setNoOfColumnStore(int noOfColumnsStore) {
-    this.noOfColumnsStore = noOfColumnsStore;
-  }
-
-  /**
-   * it's an identifier for row block or single column block
-   *
-   * @param columnarStore
-   */
-  public void setColumnarStore(boolean[] columnarStore) {
-    this.columnarStore = columnarStore;
-  }
-
-  /**
-   * set column groups
-   *
-   * @param columnGroups
-   */
-  public void setColumnGroup(int[][] columnGroups) {
-    this.columnGroups = columnGroups;
-  }
-
-  /**
-   * check if given column group is columnar
-   *
-   * @param colGroup
-   * @return true if given block is columnar
-   */
-  public boolean isColumnar(int colGroup) {
-    return columnarStore[colGroup];
-  }
-
-  /**
-   * @return columngroups
-   */
-  public int[][] getColumnGroup() {
-    return this.columnGroups;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/ColumnWithIntIndex.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/ColumnWithIntIndex.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/ColumnWithIntIndex.java
deleted file mode 100644
index 36606a5..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/ColumnWithIntIndex.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store.columnar;
-
-import java.util.Arrays;
-
-import org.apache.carbondata.core.util.ByteUtil;
-
-public class ColumnWithIntIndex implements Comparable<ColumnWithIntIndex> {
-  protected byte[] column;
-
-  private int index;
-
-  public ColumnWithIntIndex(byte[] column, int index) {
-    this.column = column;
-    this.index = index;
-  }
-
-  public ColumnWithIntIndex() {
-  }
-
-  /**
-   * @return the column
-   */
-  public byte[] getColumn() {
-    return column;
-  }
-
-  /**
-   * @param column the column to set
-   */
-  public void setColumn(byte[] column) {
-    this.column = column;
-  }
-
-  /**
-   * @return the index
-   */
-  public int getIndex() {
-    return index;
-  }
-
-  /**
-   * @param index the index to set
-   */
-  public void setIndex(int index) {
-    this.index = index;
-  }
-
-  @Override public int compareTo(ColumnWithIntIndex o) {
-    return ByteUtil.UnsafeComparer.INSTANCE.compareTo(column, o.column);
-  }
-
-  @Override public boolean equals(Object obj) {
-    if(obj == null || getClass() != obj.getClass()) {
-      return false;
-    }
-    ColumnWithIntIndex o = (ColumnWithIntIndex)obj;
-    return Arrays.equals(column, o.column) && index == o.index;
-  }
-
-  @Override public int hashCode() {
-    return Arrays.hashCode(column) + index;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/ColumnWithIntIndexForHighCard.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/ColumnWithIntIndexForHighCard.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/ColumnWithIntIndexForHighCard.java
deleted file mode 100644
index 61a1165..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/ColumnWithIntIndexForHighCard.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store.columnar;
-
-import java.util.Arrays;
-
-import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
-
-public class ColumnWithIntIndexForHighCard extends ColumnWithIntIndex
-    implements Comparable<ColumnWithIntIndex> {
-
-  public ColumnWithIntIndexForHighCard(byte[] column, int index) {
-    super(column, index);
-  }
-
-  @Override public int compareTo(ColumnWithIntIndex o) {
-    return UnsafeComparer.INSTANCE
-        .compareTo(column, 2, column.length - 2, o.column, 2, o.column.length 
- 2);
-  }
-
-  @Override public boolean equals(Object obj) {
-    if(obj == null || getClass() != obj.getClass()) {
-      return false;
-    }
-    ColumnWithIntIndexForHighCard o = (ColumnWithIntIndexForHighCard)obj;
-    return Arrays.equals(column, o.column) && getIndex() == o.getIndex();
-  }
-
-  @Override public int hashCode() {
-    return Arrays.hashCode(column) + getIndex();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/ColumnarKeyStoreDataHolder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/ColumnarKeyStoreDataHolder.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/ColumnarKeyStoreDataHolder.java
deleted file mode 100644
index 29887a3..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/ColumnarKeyStoreDataHolder.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store.columnar;
-
-import java.nio.ByteBuffer;
-
-public class ColumnarKeyStoreDataHolder {
-  private byte[] keyblockData;
-  private ColumnarKeyStoreMetadata columnarKeyStoreMetadata;
-
-  public ColumnarKeyStoreDataHolder(final byte[] keyblockData,
-      final ColumnarKeyStoreMetadata columnarKeyStoreMetadata) {
-    this.keyblockData = keyblockData;
-    this.columnarKeyStoreMetadata = columnarKeyStoreMetadata;
-  }
-
-  public ColumnarKeyStoreDataHolder(final ColumnarKeyStoreMetadata 
columnarKeyStoreMetadata) {
-    this.columnarKeyStoreMetadata = columnarKeyStoreMetadata;
-  }
-
-  public int getSurrogateKey(int columnIndex) {
-    byte[] actual = new byte[4];
-    int startIndex;
-    if (null != columnarKeyStoreMetadata.getColumnReverseIndex()) {
-      startIndex =
-          columnarKeyStoreMetadata.getColumnReverseIndex()[columnIndex] * 
columnarKeyStoreMetadata
-              .getEachRowSize();
-    } else {
-      startIndex = columnIndex * columnarKeyStoreMetadata.getEachRowSize();
-    }
-    int destPos = 4 - columnarKeyStoreMetadata.getEachRowSize();
-    System.arraycopy(keyblockData, startIndex, actual, destPos,
-        columnarKeyStoreMetadata.getEachRowSize());
-    return ByteBuffer.wrap(actual).getInt();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/ColumnarKeyStoreMetadata.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/ColumnarKeyStoreMetadata.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/ColumnarKeyStoreMetadata.java
deleted file mode 100644
index 7754ddb..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/ColumnarKeyStoreMetadata.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store.columnar;
-
-class ColumnarKeyStoreMetadata {
-
-  private int[] columnReverseIndex;
-
-  private int eachRowSize;
-
-  ColumnarKeyStoreMetadata(int eachRowSize) {
-    this.eachRowSize = eachRowSize;
-  }
-
-  /**
-   * @return the eachRowSize
-   */
-  int getEachRowSize() {
-    return eachRowSize;
-  }
-
-  /**
-   * @return the columnReverseIndex
-   */
-  int[] getColumnReverseIndex() {
-    return columnReverseIndex;
-  }
-
-  /**
-   * @param columnReverseIndex the columnReverseIndex to set
-   */
-  void setColumnReverseIndex(int[] columnReverseIndex) {
-    this.columnReverseIndex = columnReverseIndex;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/IndexStorage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/IndexStorage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/IndexStorage.java
deleted file mode 100644
index e1f4548..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/IndexStorage.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store.columnar;
-
-public interface IndexStorage<T> {
-  boolean isAlreadySorted();
-
-  T getDataAfterComp();
-
-  T getIndexMap();
-
-  byte[][] getKeyBlock();
-
-  T getDataIndexMap();
-
-  int getTotalSize();
-
-  /**
-   * @return min value of block
-   */
-  byte[] getMin();
-
-  /**
-   * @return max value of block
-   */
-  byte[] getMax();
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/UnBlockIndexer.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/UnBlockIndexer.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/UnBlockIndexer.java
deleted file mode 100644
index 149facb..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/columnar/UnBlockIndexer.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store.columnar;
-
-import java.util.Arrays;
-
-public final class UnBlockIndexer {
-
-  private UnBlockIndexer() {
-
-  }
-
-  public static int[] uncompressIndex(int[] indexData, int[] indexMap) {
-    int actualSize = indexData.length;
-    for (int i = 0; i < indexMap.length; i++) {
-      actualSize += indexData[indexMap[i] + 1] - indexData[indexMap[i]] - 1;
-    }
-    int[] indexes = new int[actualSize];
-    int k = 0;
-    for (int i = 0; i < indexData.length; i++) {
-      int index = Arrays.binarySearch(indexMap, i);
-      if (index > -1) {
-        for (int j = indexData[indexMap[index]]; j <= 
indexData[indexMap[index] + 1]; j++) {
-          indexes[k] = j;
-          k++;
-        }
-        i++;
-      } else {
-        indexes[k] = indexData[i];
-        k++;
-      }
-    }
-    return indexes;
-  }
-
-  public static byte[] uncompressData(byte[] data, int[] index, int keyLen) {
-    if (index.length < 1) {
-      return data;
-    }
-    int numberOfCopy = 0;
-    int actualSize = 0;
-    int srcPos = 0;
-    int destPos = 0;
-    for (int i = 1; i < index.length; i += 2) {
-      actualSize += index[i];
-    }
-    byte[] uncompressedData = new byte[actualSize * keyLen];
-    int picIndex = 0;
-    for (int i = 0; i < data.length; i += keyLen) {
-      numberOfCopy = index[picIndex * 2 + 1];
-      picIndex++;
-      for (int j = 0; j < numberOfCopy; j++) {
-        System.arraycopy(data, srcPos, uncompressedData, destPos, keyLen);
-        destPos += keyLen;
-      }
-      srcPos += keyLen;
-    }
-    return uncompressedData;
-  }
-
-}


Reply via email to