http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/AlluxioCarbonFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/AlluxioCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/AlluxioCarbonFile.java
deleted file mode 100644
index 9ff0f59..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/AlluxioCarbonFile.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store.filesystem;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-
-
-
-public class AlluxioCarbonFile extends AbstractDFSCarbonFile {
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(AlluxioCarbonFile.class.getName());
-
-  public AlluxioCarbonFile(String filePath) {
-    super(filePath);
-  }
-
-  public AlluxioCarbonFile(Path path) {
-    super(path);
-  }
-
-  public AlluxioCarbonFile(FileStatus fileStatus) {
-    super(fileStatus);
-  }
-
-  /**
-   * @param listStatus
-   * @return
-   */
-  private CarbonFile[] getFiles(FileStatus[] listStatus) {
-    if (listStatus == null) {
-      return new CarbonFile[0];
-    }
-    CarbonFile[] files = new CarbonFile[listStatus.length];
-    for (int i = 0; i < files.length; i++) {
-      files[i] = new AlluxioCarbonFile(listStatus[i]);
-    }
-    return files;
-  }
-
-  @Override
-  public CarbonFile[] listFiles() {
-    FileStatus[] listStatus = null;
-    try {
-      if (null != fileStatus && fileStatus.isDirectory()) {
-        Path path = fileStatus.getPath();
-        listStatus = 
path.getFileSystem(FileFactory.getConfiguration()).listStatus(path);
-      } else {
-        return null;
-      }
-    } catch (IOException e) {
-      LOGGER.error("Exception occured: " + e.getMessage());
-      return new CarbonFile[0];
-    }
-    return getFiles(listStatus);
-  }
-
-  @Override
-  public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
-    CarbonFile[] files = listFiles();
-    if (files != null && files.length >= 1) {
-      List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length);
-      for (int i = 0; i < files.length; i++) {
-        if (fileFilter.accept(files[i])) {
-          fileList.add(files[i]);
-        }
-      }
-      if (fileList.size() >= 1) {
-        return fileList.toArray(new CarbonFile[fileList.size()]);
-      } else {
-        return new CarbonFile[0];
-      }
-    }
-    return files;
-  }
-
-  @Override
-  public CarbonFile getParentFile() {
-    Path parent = fileStatus.getPath().getParent();
-    return null == parent ? null : new AlluxioCarbonFile(parent);
-  }
-
-  @Override
-  public boolean renameForce(String changetoName) {
-    FileSystem fs;
-    try {
-      fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
-      if (fs instanceof DistributedFileSystem) {
-        ((DistributedFileSystem) fs).rename(fileStatus.getPath(), new 
Path(changetoName),
-            org.apache.hadoop.fs.Options.Rename.OVERWRITE);
-        return true;
-      } else {
-        return false;
-      }
-    } catch (IOException e) {
-      LOGGER.error("Exception occured: " + e.getMessage());
-      return false;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/CarbonFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/CarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/CarbonFile.java
deleted file mode 100644
index 642055b..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/CarbonFile.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store.filesystem;
-
-public interface CarbonFile {
-
-  String getAbsolutePath();
-
-  CarbonFile[] listFiles(CarbonFileFilter fileFilter);
-
-  CarbonFile[] listFiles();
-
-  String getName();
-
-  boolean isDirectory();
-
-  boolean exists();
-
-  String getCanonicalPath();
-
-  CarbonFile getParentFile();
-
-  String getPath();
-
-  long getSize();
-
-  boolean renameTo(String changetoName);
-
-  boolean renameForce(String changetoName);
-
-  boolean delete();
-
-  boolean createNewFile();
-
-  long getLastModifiedTime();
-
-  boolean setLastModifiedTime(long timestamp);
-
-  boolean truncate(String fileName, long validDataEndOffset);
-
-  /**
-   * This method will be used to check whether a file has been modified or not
-   *
-   * @param fileTimeStamp time to be compared with latest timestamp of file
-   * @param endOffset     file length to be compared with current length of 
file
-   * @return
-   */
-  boolean isFileModified(long fileTimeStamp, long endOffset);
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/CarbonFileFilter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/CarbonFileFilter.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/CarbonFileFilter.java
deleted file mode 100644
index 7db3b2b..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/CarbonFileFilter.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store.filesystem;
-
-public interface CarbonFileFilter {
-  boolean accept(CarbonFile file);
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java
deleted file mode 100644
index ebe18e4..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store.filesystem;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-
-public class HDFSCarbonFile extends AbstractDFSCarbonFile {
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(HDFSCarbonFile.class.getName());
-
-  public HDFSCarbonFile(String filePath) {
-    super(filePath);
-  }
-
-  public HDFSCarbonFile(Path path) {
-    super(path);
-  }
-
-  public HDFSCarbonFile(FileStatus fileStatus) {
-    super(fileStatus);
-  }
-
-  /**
-   * @param listStatus
-   * @return
-   */
-  private CarbonFile[] getFiles(FileStatus[] listStatus) {
-    if (listStatus == null) {
-      return new CarbonFile[0];
-    }
-    CarbonFile[] files = new CarbonFile[listStatus.length];
-    for (int i = 0; i < files.length; i++) {
-      files[i] = new HDFSCarbonFile(listStatus[i]);
-    }
-    return files;
-  }
-
-  @Override
-  public CarbonFile[] listFiles() {
-    FileStatus[] listStatus = null;
-    try {
-      if (null != fileStatus && fileStatus.isDirectory()) {
-        Path path = fileStatus.getPath();
-        listStatus = 
path.getFileSystem(FileFactory.getConfiguration()).listStatus(path);
-      } else {
-        return null;
-      }
-    } catch (IOException e) {
-      LOGGER.error("Exception occured: " + e.getMessage());
-      return new CarbonFile[0];
-    }
-    return getFiles(listStatus);
-  }
-
-  @Override
-  public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
-    CarbonFile[] files = listFiles();
-    if (files != null && files.length >= 1) {
-      List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length);
-      for (int i = 0; i < files.length; i++) {
-        if (fileFilter.accept(files[i])) {
-          fileList.add(files[i]);
-        }
-      }
-      if (fileList.size() >= 1) {
-        return fileList.toArray(new CarbonFile[fileList.size()]);
-      } else {
-        return new CarbonFile[0];
-      }
-    }
-    return files;
-  }
-
-  @Override
-  public CarbonFile getParentFile() {
-    Path parent = fileStatus.getPath().getParent();
-    return null == parent ? null : new HDFSCarbonFile(parent);
-  }
-
-  @Override
-  public boolean renameForce(String changetoName) {
-    FileSystem fs;
-    try {
-      fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
-      if (fs instanceof DistributedFileSystem) {
-        ((DistributedFileSystem) fs).rename(fileStatus.getPath(), new 
Path(changetoName),
-            org.apache.hadoop.fs.Options.Rename.OVERWRITE);
-        return true;
-      } else {
-        return false;
-      }
-    } catch (IOException e) {
-      LOGGER.error("Exception occured: " + e.getMessage());
-      return false;
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/LocalCarbonFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/LocalCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/LocalCarbonFile.java
deleted file mode 100644
index 406f6d1..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/LocalCarbonFile.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store.filesystem;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-import org.apache.carbondata.core.util.CarbonUtil;
-
-import org.apache.hadoop.fs.Path;
-
-public class LocalCarbonFile implements CarbonFile {
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(LocalCarbonFile.class.getName());
-  private File file;
-
-  public LocalCarbonFile(String filePath) {
-    Path pathWithoutSchemeAndAuthority = 
Path.getPathWithoutSchemeAndAuthority(new Path(filePath));
-    file = new File(pathWithoutSchemeAndAuthority.toString());
-  }
-
-  public LocalCarbonFile(File file) {
-    this.file = file;
-  }
-
-  @Override public String getAbsolutePath() {
-    return file.getAbsolutePath();
-  }
-
-  @Override public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
-    if (!file.isDirectory()) {
-      return null;
-    }
-
-    File[] files = file.listFiles(new FileFilter() {
-
-      @Override public boolean accept(File pathname) {
-        return fileFilter.accept(new LocalCarbonFile(pathname));
-      }
-    });
-
-    if (files == null) {
-      return new CarbonFile[0];
-    }
-
-    CarbonFile[] carbonFiles = new CarbonFile[files.length];
-
-    for (int i = 0; i < carbonFiles.length; i++) {
-      carbonFiles[i] = new LocalCarbonFile(files[i]);
-    }
-
-    return carbonFiles;
-  }
-
-  @Override public String getName() {
-    return file.getName();
-  }
-
-  @Override public boolean isDirectory() {
-    return file.isDirectory();
-  }
-
-  @Override public boolean exists() {
-    if (file != null) {
-      return file.exists();
-    }
-    return false;
-  }
-
-  @Override public String getCanonicalPath() {
-    try {
-      return file.getCanonicalPath();
-    } catch (IOException e) {
-      LOGGER
-          .error(e, "Exception occured" + e.getMessage());
-    }
-    return null;
-  }
-
-  @Override public CarbonFile getParentFile() {
-    return new LocalCarbonFile(file.getParentFile());
-  }
-
-  @Override public String getPath() {
-    return file.getPath();
-  }
-
-  @Override public long getSize() {
-    return file.length();
-  }
-
-  public boolean renameTo(String changetoName) {
-    return file.renameTo(new File(changetoName));
-  }
-
-  public boolean delete() {
-    return file.delete();
-  }
-
-  @Override public CarbonFile[] listFiles() {
-
-    if (!file.isDirectory()) {
-      return null;
-    }
-    File[] files = file.listFiles();
-    if (files == null) {
-      return new CarbonFile[0];
-    }
-    CarbonFile[] carbonFiles = new CarbonFile[files.length];
-    for (int i = 0; i < carbonFiles.length; i++) {
-      carbonFiles[i] = new LocalCarbonFile(files[i]);
-    }
-
-    return carbonFiles;
-
-  }
-
-  @Override public boolean createNewFile() {
-    try {
-      return file.createNewFile();
-    } catch (IOException e) {
-      return false;
-    }
-  }
-
-  @Override public long getLastModifiedTime() {
-    return file.lastModified();
-  }
-
-  @Override public boolean setLastModifiedTime(long timestamp) {
-    return file.setLastModified(timestamp);
-  }
-
-  /**
-   * This method will delete the data in file data from a given offset
-   */
-  @Override public boolean truncate(String fileName, long validDataEndOffset) {
-    FileChannel source = null;
-    FileChannel destination = null;
-    boolean fileTruncatedSuccessfully = false;
-    // temporary file name
-    String tempWriteFilePath = fileName + 
CarbonCommonConstants.TEMPWRITEFILEEXTENSION;
-    FileFactory.FileType fileType = FileFactory.getFileType(fileName);
-    try {
-      CarbonFile tempFile = null;
-      // delete temporary file if it already exists at a given path
-      if (FileFactory.isFileExist(tempWriteFilePath, fileType)) {
-        tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
-        tempFile.delete();
-      }
-      // create new temporary file
-      FileFactory.createNewFile(tempWriteFilePath, fileType);
-      tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
-      source = new FileInputStream(fileName).getChannel();
-      destination = new FileOutputStream(tempWriteFilePath).getChannel();
-      long read = destination.transferFrom(source, 0, validDataEndOffset);
-      long totalBytesRead = read;
-      long remaining = validDataEndOffset - totalBytesRead;
-      // read till required data offset is not reached
-      while (remaining > 0) {
-        read = destination.transferFrom(source, totalBytesRead, remaining);
-        totalBytesRead = totalBytesRead + read;
-        remaining = remaining - totalBytesRead;
-      }
-      CarbonUtil.closeStreams(source, destination);
-      // rename the temp file to original file
-      tempFile.renameForce(fileName);
-      fileTruncatedSuccessfully = true;
-    } catch (IOException e) {
-      LOGGER.error("Exception occured while truncating the file " + 
e.getMessage());
-    } finally {
-      CarbonUtil.closeStreams(source, destination);
-    }
-    return fileTruncatedSuccessfully;
-  }
-
-  /**
-   * This method will be used to check whether a file has been modified or not
-   *
-   * @param fileTimeStamp time to be compared with latest timestamp of file
-   * @param endOffset     file length to be compared with current length of 
file
-   * @return
-   */
-  @Override public boolean isFileModified(long fileTimeStamp, long endOffset) {
-    boolean isFileModified = false;
-    if (getLastModifiedTime() > fileTimeStamp || getSize() > endOffset) {
-      isFileModified = true;
-    }
-    return isFileModified;
-  }
-
-  @Override public boolean renameForce(String changetoName) {
-    File destFile = new File(changetoName);
-    if (destFile.exists()) {
-      if (destFile.delete()) {
-        return file.renameTo(new File(changetoName));
-      }
-    }
-
-    return file.renameTo(new File(changetoName));
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/ViewFSCarbonFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/ViewFSCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/ViewFSCarbonFile.java
deleted file mode 100644
index 8f11b7a..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/ViewFSCarbonFile.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.core.datastorage.store.filesystem;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.viewfs.ViewFileSystem;
-
-public class ViewFSCarbonFile extends AbstractDFSCarbonFile {
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(ViewFSCarbonFile.class.getName());
-
-  public ViewFSCarbonFile(String filePath) {
-    super(filePath);
-  }
-
-  public ViewFSCarbonFile(Path path) {
-    super(path);
-  }
-
-  public ViewFSCarbonFile(FileStatus fileStatus) {
-    super(fileStatus);
-  }
-
-  /**
-   * @param listStatus
-   * @return
-   */
-  private CarbonFile[] getFiles(FileStatus[] listStatus) {
-    if (listStatus == null) {
-      return new CarbonFile[0];
-    }
-    CarbonFile[] files = new CarbonFile[listStatus.length];
-    for (int i = 0; i < files.length; i++) {
-      files[i] = new ViewFSCarbonFile(listStatus[i]);
-    }
-    return files;
-  }
-
-  @Override
-  public CarbonFile[] listFiles() {
-    FileStatus[] listStatus = null;
-    try {
-      if (null != fileStatus && fileStatus.isDirectory()) {
-        Path path = fileStatus.getPath();
-        listStatus = 
path.getFileSystem(FileFactory.getConfiguration()).listStatus(path);
-      } else {
-        return null;
-      }
-    } catch (IOException ex) {
-      LOGGER.error("Exception occured" + ex.getMessage());
-      return new CarbonFile[0];
-    }
-    return getFiles(listStatus);
-  }
-
-  @Override
-  public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
-    CarbonFile[] files = listFiles();
-    if (files != null && files.length >= 1) {
-      List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length);
-      for (int i = 0; i < files.length; i++) {
-        if (fileFilter.accept(files[i])) {
-          fileList.add(files[i]);
-        }
-      }
-      if (fileList.size() >= 1) {
-        return fileList.toArray(new CarbonFile[fileList.size()]);
-      } else {
-        return new CarbonFile[0];
-      }
-    }
-    return files;
-  }
-
-  @Override public CarbonFile getParentFile() {
-    Path parent = fileStatus.getPath().getParent();
-    return null == parent ? null : new ViewFSCarbonFile(parent);
-  }
-
-  @Override
-  public boolean renameForce(String changetoName) {
-    FileSystem fs;
-    try {
-      fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
-      if (fs instanceof ViewFileSystem) {
-        fs.delete(new Path(changetoName), true);
-        fs.rename(fileStatus.getPath(), new Path(changetoName));
-        return true;
-      } else {
-        return false;
-      }
-    } catch (IOException e) {
-      LOGGER.error("Exception occured" + e.getMessage());
-      return false;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/CompressedDataMeasureDataWrapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/CompressedDataMeasureDataWrapper.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/CompressedDataMeasureDataWrapper.java
deleted file mode 100644
index c9571b2..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/CompressedDataMeasureDataWrapper.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store.impl;
-
-import org.apache.carbondata.core.datastorage.store.MeasureDataWrapper;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
-
-public class CompressedDataMeasureDataWrapper implements MeasureDataWrapper {
-
-  private final CarbonReadDataHolder[] values;
-
-  public CompressedDataMeasureDataWrapper(final CarbonReadDataHolder[] values) 
{
-    this.values = values;
-  }
-
-  @Override public CarbonReadDataHolder[] getValues() {
-    return values;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java
deleted file mode 100644
index 6e2f5d7..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.core.datastorage.store.impl;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.FileHolder;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-
-public class DFSFileHolderImpl implements FileHolder {
-  /**
-   * cache to hold filename and its stream
-   */
-  private Map<String, FSDataInputStream> fileNameAndStreamCache;
-
-  public DFSFileHolderImpl() {
-    this.fileNameAndStreamCache =
-        new HashMap<String, 
FSDataInputStream>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-  }
-
-  @Override public byte[] readByteArray(String filePath, long offset, int 
length)
-      throws IOException {
-    FSDataInputStream fileChannel = updateCache(filePath);
-    return read(fileChannel, length, offset);
-  }
-
-  /**
-   * This method will be used to check whether stream is already present in
-   * cache or not for filepath if not present then create it and then add to
-   * cache, other wise get from cache
-   *
-   * @param filePath fully qualified file path
-   * @return channel
-   */
-  private FSDataInputStream updateCache(String filePath) throws IOException {
-    FSDataInputStream fileChannel = fileNameAndStreamCache.get(filePath);
-    if (null == fileChannel) {
-      Path pt = new Path(filePath);
-      FileSystem fs = FileSystem.get(FileFactory.getConfiguration());
-      fileChannel = fs.open(pt);
-      fileNameAndStreamCache.put(filePath, fileChannel);
-    }
-    return fileChannel;
-  }
-
-  /**
-   * This method will be used to read from file based on number of bytes to be 
read and positon
-   *
-   * @param channel file channel
-   * @param size    number of bytes
-   * @param offset  position
-   * @return byte buffer
-   */
-  private byte[] read(FSDataInputStream channel, int size, long offset) throws 
IOException {
-    byte[] byteBffer = new byte[size];
-    channel.seek(offset);
-    channel.readFully(byteBffer);
-    return byteBffer;
-  }
-
-  /**
-   * This method will be used to read from file based on number of bytes to be 
read and positon
-   *
-   * @param channel file channel
-   * @param size    number of bytes
-   * @return byte buffer
-   */
-  private byte[] read(FSDataInputStream channel, int size) throws IOException {
-    byte[] byteBffer = new byte[size];
-    channel.readFully(byteBffer);
-    return byteBffer;
-  }
-
-  @Override public int readInt(String filePath, long offset) throws 
IOException {
-    FSDataInputStream fileChannel = updateCache(filePath);
-    fileChannel.seek(offset);
-    return fileChannel.readInt();
-  }
-
-  @Override public long readDouble(String filePath, long offset) throws 
IOException {
-    FSDataInputStream fileChannel = updateCache(filePath);
-    fileChannel.seek(offset);
-    return fileChannel.readLong();
-  }
-
-  @Override public void finish() throws IOException {
-    for (Entry<String, FSDataInputStream> entry : 
fileNameAndStreamCache.entrySet()) {
-      FSDataInputStream channel = entry.getValue();
-      if (null != channel) {
-        channel.close();
-      }
-    }
-  }
-
-  @Override public byte[] readByteArray(String filePath, int length) throws 
IOException {
-    FSDataInputStream fileChannel = updateCache(filePath);
-    return read(fileChannel, length);
-  }
-
-  @Override public long readLong(String filePath, long offset) throws 
IOException {
-    FSDataInputStream fileChannel = updateCache(filePath);
-    fileChannel.seek(offset);
-    return fileChannel.readLong();
-  }
-
-  @Override public int readInt(String filePath) throws IOException {
-    FSDataInputStream fileChannel = updateCache(filePath);
-    return fileChannel.readInt();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
deleted file mode 100644
index f0c424b..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
+++ /dev/null
@@ -1,485 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store.impl;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.zip.GZIPInputStream;
-
-import org.apache.carbondata.core.datastorage.store.FileHolder;
-import 
org.apache.carbondata.core.datastorage.store.filesystem.AlluxioCarbonFile;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastorage.store.filesystem.HDFSCarbonFile;
-import org.apache.carbondata.core.datastorage.store.filesystem.LocalCarbonFile;
-import 
org.apache.carbondata.core.datastorage.store.filesystem.ViewFSCarbonFile;
-import org.apache.carbondata.core.util.CarbonUtil;
-
-import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.BZip2Codec;
-import org.apache.hadoop.io.compress.GzipCodec;
-
-public final class FileFactory {
-  private static Configuration configuration = null;
-
-  static {
-    configuration = new Configuration();
-    configuration.addResource(new Path("../core-default.xml"));
-  }
-
-  private FileFactory() {
-
-  }
-
-  public static Configuration getConfiguration() {
-    return configuration;
-  }
-
-  public static FileHolder getFileHolder(FileType fileType) {
-    switch (fileType) {
-      case LOCAL:
-        return new FileHolderImpl();
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-        return new DFSFileHolderImpl();
-      default:
-        return new FileHolderImpl();
-    }
-  }
-
-  public static FileType getFileType(String path) {
-    if (path.startsWith(CarbonUtil.HDFS_PREFIX)) {
-      return FileType.HDFS;
-    }
-    else if (path.startsWith(CarbonUtil.ALLUXIO_PREFIX)) {
-      return FileType.ALLUXIO;
-    }
-    else if (path.startsWith(CarbonUtil.VIEWFS_PREFIX)) {
-      return FileType.VIEWFS;
-    }
-    return FileType.LOCAL;
-  }
-
-  public static CarbonFile getCarbonFile(String path, FileType fileType) {
-    switch (fileType) {
-      case LOCAL:
-        return new LocalCarbonFile(path);
-      case HDFS:
-        return new HDFSCarbonFile(path);
-      case ALLUXIO:
-        return new AlluxioCarbonFile(path);
-      case VIEWFS:
-        return new ViewFSCarbonFile(path);
-      default:
-        return new LocalCarbonFile(path);
-    }
-  }
-
-  public static DataInputStream getDataInputStream(String path, FileType 
fileType)
-      throws IOException {
-    return getDataInputStream(path, fileType, -1);
-  }
-
-  public static DataInputStream getDataInputStream(String path, FileType 
fileType, int bufferSize)
-      throws IOException {
-    path = path.replace("\\", "/");
-    boolean gzip = path.endsWith(".gz");
-    boolean bzip2 = path.endsWith(".bz2");
-    InputStream stream;
-    switch (fileType) {
-      case LOCAL:
-        if (gzip) {
-          stream = new GZIPInputStream(new FileInputStream(path));
-        } else if (bzip2) {
-          stream = new BZip2CompressorInputStream(new FileInputStream(path));
-        } else {
-          stream = new FileInputStream(path);
-        }
-        break;
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-        Path pt = new Path(path);
-        FileSystem fs = pt.getFileSystem(configuration);
-        if (bufferSize == -1) {
-          stream = fs.open(pt);
-        } else {
-          stream = fs.open(pt, bufferSize);
-        }
-        if (gzip) {
-          GzipCodec codec = new GzipCodec();
-          stream = codec.createInputStream(stream);
-        } else if (bzip2) {
-          BZip2Codec codec = new BZip2Codec();
-          stream = codec.createInputStream(stream);
-        }
-        break;
-      default:
-        throw new UnsupportedOperationException("unsupported file system");
-    }
-    return new DataInputStream(new BufferedInputStream(stream));
-  }
-
-  /**
-   * return the datainputStream which is seek to the offset of file
-   *
-   * @param path
-   * @param fileType
-   * @param bufferSize
-   * @param offset
-   * @return DataInputStream
-   * @throws IOException
-   */
-  public static DataInputStream getDataInputStream(String path, FileType 
fileType, int bufferSize,
-      long offset) throws IOException {
-    path = path.replace("\\", "/");
-    switch (fileType) {
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-        Path pt = new Path(path);
-        FileSystem fs = pt.getFileSystem(configuration);
-        FSDataInputStream stream = fs.open(pt, bufferSize);
-        stream.seek(offset);
-        return new DataInputStream(new BufferedInputStream(stream));
-      default:
-        FileInputStream fis = new FileInputStream(path);
-        long actualSkipSize = 0;
-        long skipSize = offset;
-        while (actualSkipSize != offset) {
-          actualSkipSize += fis.skip(skipSize);
-          skipSize = skipSize - actualSkipSize;
-        }
-        return new DataInputStream(new BufferedInputStream(fis));
-    }
-  }
-
-  public static DataOutputStream getDataOutputStream(String path, FileType 
fileType)
-      throws IOException {
-    path = path.replace("\\", "/");
-    switch (fileType) {
-      case LOCAL:
-        return new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(path)));
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-        Path pt = new Path(path);
-        FileSystem fs = pt.getFileSystem(configuration);
-        FSDataOutputStream stream = fs.create(pt, true);
-        return stream;
-      default:
-        return new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(path)));
-    }
-  }
-
-  public static DataOutputStream getDataOutputStream(String path, FileType 
fileType, int bufferSize,
-      boolean append) throws IOException {
-    path = path.replace("\\", "/");
-    switch (fileType) {
-      case LOCAL:
-        return new DataOutputStream(
-            new BufferedOutputStream(new FileOutputStream(path, append), 
bufferSize));
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-        Path pt = new Path(path);
-        FileSystem fs = pt.getFileSystem(configuration);
-        FSDataOutputStream stream = null;
-        if (append) {
-          // append to a file only if file already exists else file not found
-          // exception will be thrown by hdfs
-          if (CarbonUtil.isFileExists(path)) {
-            stream = fs.append(pt, bufferSize);
-          } else {
-            stream = fs.create(pt, true, bufferSize);
-          }
-        } else {
-          stream = fs.create(pt, true, bufferSize);
-        }
-        return stream;
-      default:
-        return new DataOutputStream(
-            new BufferedOutputStream(new FileOutputStream(path), bufferSize));
-    }
-  }
-
-  public static DataOutputStream getDataOutputStream(String path, FileType 
fileType, int bufferSize,
-      long blockSize) throws IOException {
-    path = path.replace("\\", "/");
-    switch (fileType) {
-      case LOCAL:
-        return new DataOutputStream(
-            new BufferedOutputStream(new FileOutputStream(path), bufferSize));
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-        Path pt = new Path(path);
-        FileSystem fs = pt.getFileSystem(configuration);
-        FSDataOutputStream stream =
-            fs.create(pt, true, bufferSize, fs.getDefaultReplication(pt), 
blockSize);
-        return stream;
-      default:
-        return new DataOutputStream(
-            new BufferedOutputStream(new FileOutputStream(path), bufferSize));
-    }
-  }
-
-  /**
-   * This method checks the given path exists or not and also is it file or
-   * not if the performFileCheck is true
-   *
-   * @param filePath         - Path
-   * @param fileType         - FileType Local/HDFS
-   * @param performFileCheck - Provide false for folders, true for files and
-   */
-  public static boolean isFileExist(String filePath, FileType fileType, 
boolean performFileCheck)
-      throws IOException {
-    filePath = filePath.replace("\\", "/");
-    switch (fileType) {
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-        Path path = new Path(filePath);
-        FileSystem fs = path.getFileSystem(configuration);
-        if (performFileCheck) {
-          return fs.exists(path) && fs.isFile(path);
-        } else {
-          return fs.exists(path);
-        }
-
-      case LOCAL:
-      default:
-        File defaultFile = new File(filePath);
-
-        if (performFileCheck) {
-          return defaultFile.exists() && defaultFile.isFile();
-        } else {
-          return defaultFile.exists();
-        }
-    }
-  }
-
-  /**
-   * This method checks the given path exists or not and also is it file or
-   * not if the performFileCheck is true
-   *
-   * @param filePath - Path
-   * @param fileType - FileType Local/HDFS
-   */
-  public static boolean isFileExist(String filePath, FileType fileType) throws 
IOException {
-    filePath = filePath.replace("\\", "/");
-    switch (fileType) {
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-        Path path = new Path(filePath);
-        FileSystem fs = path.getFileSystem(configuration);
-        return fs.exists(path);
-
-      case LOCAL:
-      default:
-        File defaultFile = new File(filePath);
-        return defaultFile.exists();
-    }
-  }
-
-  public static boolean createNewFile(String filePath, FileType fileType) 
throws IOException {
-    filePath = filePath.replace("\\", "/");
-    switch (fileType) {
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-        Path path = new Path(filePath);
-        FileSystem fs = path.getFileSystem(configuration);
-        return fs.createNewFile(path);
-
-      case LOCAL:
-      default:
-        File file = new File(filePath);
-        return file.createNewFile();
-    }
-  }
-
-  public static boolean deleteFile(String filePath, FileType fileType) throws 
IOException {
-    filePath = filePath.replace("\\", "/");
-    switch (fileType) {
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-        Path path = new Path(filePath);
-        FileSystem fs = path.getFileSystem(configuration);
-        return fs.delete(path, true);
-
-      case LOCAL:
-      default:
-        File file = new File(filePath);
-        return deleteAllFilesOfDir(file);
-    }
-  }
-
-  public static boolean deleteAllFilesOfDir(File path) {
-    if (!path.exists()) {
-      return true;
-    }
-    if (path.isFile()) {
-      return path.delete();
-    }
-    File[] files = path.listFiles();
-    for (int i = 0; i < files.length; i++) {
-      deleteAllFilesOfDir(files[i]);
-    }
-    return path.delete();
-  }
-
-  public static boolean mkdirs(String filePath, FileType fileType) throws 
IOException {
-    filePath = filePath.replace("\\", "/");
-    switch (fileType) {
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-        Path path = new Path(filePath);
-        FileSystem fs = path.getFileSystem(configuration);
-        return fs.mkdirs(path);
-      case LOCAL:
-      default:
-        File file = new File(filePath);
-        return file.mkdirs();
-    }
-  }
-
-  /**
-   * for getting the dataoutput stream using the hdfs filesystem append API.
-   *
-   * @param path
-   * @param fileType
-   * @return
-   * @throws IOException
-   */
-  public static DataOutputStream getDataOutputStreamUsingAppend(String path, 
FileType fileType)
-      throws IOException {
-    path = path.replace("\\", "/");
-    switch (fileType) {
-      case LOCAL:
-        return new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(path, true)));
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-        Path pt = new Path(path);
-        FileSystem fs = pt.getFileSystem(configuration);
-        FSDataOutputStream stream = fs.append(pt);
-        return stream;
-      default:
-        return new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(path)));
-    }
-  }
-
-  /**
-   * for creating a new Lock file and if it is successfully created
-   * then in case of abrupt shutdown then the stream to that file will be 
closed.
-   *
-   * @param filePath
-   * @param fileType
-   * @return
-   * @throws IOException
-   */
-  public static boolean createNewLockFile(String filePath, FileType fileType) 
throws IOException {
-    filePath = filePath.replace("\\", "/");
-    switch (fileType) {
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-        Path path = new Path(filePath);
-        FileSystem fs = path.getFileSystem(configuration);
-        if (fs.createNewFile(path)) {
-          fs.deleteOnExit(path);
-          return true;
-        }
-        return false;
-      case LOCAL:
-      default:
-        File file = new File(filePath);
-        return file.createNewFile();
-    }
-  }
-
-  public enum FileType {
-    LOCAL, HDFS, ALLUXIO, VIEWFS
-  }
-
-  /**
-   * below method will be used to update the file path
-   * for local type
-   * it removes the file:/ from the path
-   *
-   * @param filePath
-   * @return updated file path without url for local
-   */
-  public static String getUpdatedFilePath(String filePath) {
-    FileType fileType = getFileType(filePath);
-    switch (fileType) {
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-        return filePath;
-      case LOCAL:
-      default:
-        Path pathWithoutSchemeAndAuthority =
-            Path.getPathWithoutSchemeAndAuthority(new Path(filePath));
-        return pathWithoutSchemeAndAuthority.toString();
-    }
-  }
-
-  /**
-   * It computes size of directory
-   *
-   * @param filePath
-   * @return size in bytes
-   * @throws IOException
-   */
-  public static long getDirectorySize(String filePath) throws IOException {
-    FileType fileType = getFileType(filePath);
-    switch (fileType) {
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-        Path path = new Path(filePath);
-        FileSystem fs = path.getFileSystem(configuration);
-        return fs.getContentSummary(path).getLength();
-      case LOCAL:
-      default:
-        File file = new File(filePath);
-        return FileUtils.sizeOfDirectory(file);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileHolderImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileHolderImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileHolderImpl.java
deleted file mode 100644
index 9a5c06a..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileHolderImpl.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store.impl;
-
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.FileHolder;
-
-public class FileHolderImpl implements FileHolder {
-  /**
-   * cache to hold filename and its stream
-   */
-  private Map<String, FileChannel> fileNameAndStreamCache;
-
-  /**
-   * FileHolderImpl Constructor
-   * It will create the cache
-   */
-  public FileHolderImpl() {
-    this.fileNameAndStreamCache =
-        new HashMap<String, 
FileChannel>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-  }
-
-  public FileHolderImpl(int capacity) {
-    this.fileNameAndStreamCache = new HashMap<String, FileChannel>(capacity);
-  }
-
-  /**
-   * This method will be used to read the byte array from file based on offset
-   * and length(number of bytes) need to read
-   *
-   * @param filePath fully qualified file path
-   * @param offset   reading start position,
-   * @param length   number of bytes to be read
-   * @return read byte array
-   */
-  @Override public byte[] readByteArray(String filePath, long offset, int 
length)
-      throws IOException {
-    FileChannel fileChannel = updateCache(filePath);
-    ByteBuffer byteBffer = read(fileChannel, length, offset);
-    return byteBffer.array();
-  }
-
-  /**
-   * This method will be used to close all the streams currently present in 
the cache
-   */
-  @Override public void finish() throws IOException {
-    for (Entry<String, FileChannel> entry : fileNameAndStreamCache.entrySet()) 
{
-      FileChannel channel = entry.getValue();
-      if (null != channel) {
-        channel.close();
-      }
-    }
-  }
-
-  /**
-   * This method will be used to read int from file from postion(offset), here
-   * length will be always 4 bacause int byte size if 4
-   *
-   * @param filePath fully qualified file path
-   * @param offset   reading start position,
-   * @return read int
-   */
-  @Override public int readInt(String filePath, long offset) throws 
IOException {
-    FileChannel fileChannel = updateCache(filePath);
-    ByteBuffer byteBffer = read(fileChannel, 
CarbonCommonConstants.INT_SIZE_IN_BYTE, offset);
-    return byteBffer.getInt();
-  }
-
-  /**
-   * This method will be used to read int from file from postion(offset), here
-   * length will be always 4 bacause int byte size if 4
-   *
-   * @param filePath fully qualified file path
-   * @return read int
-   */
-  @Override public int readInt(String filePath) throws IOException {
-    FileChannel fileChannel = updateCache(filePath);
-    ByteBuffer byteBffer = read(fileChannel, 
CarbonCommonConstants.INT_SIZE_IN_BYTE);
-    return byteBffer.getInt();
-  }
-
-  /**
-   * This method will be used to read int from file from postion(offset), here
-   * length will be always 4 bacause int byte size if 4
-   *
-   * @param filePath fully qualified file path
-   * @param offset   reading start position,
-   * @return read int
-   */
-  @Override public long readDouble(String filePath, long offset) throws 
IOException {
-    FileChannel fileChannel = updateCache(filePath);
-    ByteBuffer byteBffer = read(fileChannel, 
CarbonCommonConstants.LONG_SIZE_IN_BYTE, offset);
-    return byteBffer.getLong();
-  }
-
-  /**
-   * This method will be used to check whether stream is already present in
-   * cache or not for filepath if not present then create it and then add to
-   * cache, other wise get from cache
-   *
-   * @param filePath fully qualified file path
-   * @return channel
-   */
-  private FileChannel updateCache(String filePath) throws 
FileNotFoundException {
-    FileChannel fileChannel = fileNameAndStreamCache.get(filePath);
-    if (null == fileChannel) {
-      FileInputStream stream = new FileInputStream(filePath);
-      fileChannel = stream.getChannel();
-      fileNameAndStreamCache.put(filePath, fileChannel);
-    }
-    return fileChannel;
-  }
-
-  /**
-   * This method will be used to read from file based on number of bytes to be 
read and positon
-   *
-   * @param channel file channel
-   * @param size    number of bytes
-   * @param offset  position
-   * @return byte buffer
-   */
-  private ByteBuffer read(FileChannel channel, int size, long offset) throws 
IOException {
-    ByteBuffer byteBffer = ByteBuffer.allocate(size);
-    channel.position(offset);
-    channel.read(byteBffer);
-    byteBffer.rewind();
-    return byteBffer;
-  }
-
-  /**
-   * This method will be used to read from file based on number of bytes to be 
read and positon
-   *
-   * @param channel file channel
-   * @param size    number of bytes
-   * @return byte buffer
-   */
-  private ByteBuffer read(FileChannel channel, int size) throws IOException {
-    ByteBuffer byteBffer = ByteBuffer.allocate(size);
-    channel.read(byteBffer);
-    byteBffer.rewind();
-    return byteBffer;
-  }
-
-
-  /**
-   * This method will be used to read the byte array from file based on 
length(number of bytes)
-   *
-   * @param filePath fully qualified file path
-   * @param length   number of bytes to be read
-   * @return read byte array
-   */
-  @Override public byte[] readByteArray(String filePath, int length) throws 
IOException {
-    FileChannel fileChannel = updateCache(filePath);
-    ByteBuffer byteBffer = read(fileChannel, length);
-    return byteBffer.array();
-  }
-
-  /**
-   * This method will be used to read long from file from postion(offset), here
-   * length will be always 8 bacause int byte size is 8
-   *
-   * @param filePath fully qualified file path
-   * @param offset   reading start position,
-   * @return read long
-   */
-  @Override public long readLong(String filePath, long offset) throws 
IOException {
-    FileChannel fileChannel = updateCache(filePath);
-    ByteBuffer byteBffer = read(fileChannel, 
CarbonCommonConstants.LONG_SIZE_IN_BYTE, offset);
-    return byteBffer.getLong();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java
deleted file mode 100644
index ad1d62f..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store.impl.data.compressed;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.NodeMeasureDataStore;
-import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressionHolder;
-import 
org.apache.carbondata.core.datastorage.store.compression.WriterCompressModel;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonWriteDataHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil;
-
-public abstract class AbstractHeavyCompressedDoubleArrayDataStore
-    implements NodeMeasureDataStore //NodeMeasureDataStore<double[]>
-{
-
-  private LogService LOGGER =
-      
LogServiceFactory.getLogService(AbstractHeavyCompressedDoubleArrayDataStore.class.getName());
-
-  /**
-   * values.
-   */
-  protected ValueCompressionHolder[] values;
-
-  /**
-   * compressionModel.
-   */
-  protected WriterCompressModel compressionModel;
-
-  /**
-   * type
-   */
-  private char[] type;
-
-  /**
-   * AbstractHeavyCompressedDoubleArrayDataStore constructor.
-   *
-   * @param compressionModel
-   */
-  public AbstractHeavyCompressedDoubleArrayDataStore(WriterCompressModel 
compressionModel) {
-    this.compressionModel = compressionModel;
-    if (null != compressionModel) {
-      this.type = compressionModel.getType();
-      values =
-          new 
ValueCompressionHolder[compressionModel.getValueCompressionHolder().length];
-    }
-  }
-
-  // this method first invokes encoding routine to encode the data chunk,
-  // followed by invoking compression routine for preparing the data chunk for 
writing.
-  @Override public byte[][] 
getWritableMeasureDataArray(CarbonWriteDataHolder[] dataHolder) {
-    byte[][] returnValue = new byte[values.length][];
-    for (int i = 0; i < compressionModel.getValueCompressionHolder().length; 
i++) {
-      values[i] = compressionModel.getValueCompressionHolder()[i];
-      if (type[i] != CarbonCommonConstants.BYTE_VALUE_MEASURE) {
-        // first perform encoding of the data chunk
-        values[i].setValue(
-            
ValueCompressionUtil.getValueCompressor(compressionModel.getCompressionFinders()[i])
-                
.getCompressedValues(compressionModel.getCompressionFinders()[i], dataHolder[i],
-                    compressionModel.getMaxValue()[i],
-                    compressionModel.getMantissa()[i]));
-      } else {
-        values[i].setValue(dataHolder[i].getWritableByteArrayValues());
-      }
-      values[i].compress();
-      returnValue[i] = values[i].getCompressedData();
-    }
-
-    return returnValue;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java
deleted file mode 100644
index 0c29143..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store.impl.data.compressed;
-
-import 
org.apache.carbondata.core.datastorage.store.compression.WriterCompressModel;
-
-public class HeavyCompressedDoubleArrayDataInMemoryStore
-    extends AbstractHeavyCompressedDoubleArrayDataStore {
-
-  public HeavyCompressedDoubleArrayDataInMemoryStore(WriterCompressModel 
compressionModel) {
-    super(compressionModel);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/util/StoreFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/util/StoreFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/util/StoreFactory.java
deleted file mode 100644
index 7561ec7..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/util/StoreFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.util;
-
-import org.apache.carbondata.core.datastorage.store.NodeMeasureDataStore;
-import 
org.apache.carbondata.core.datastorage.store.compression.WriterCompressModel;
-import 
org.apache.carbondata.core.datastorage.store.impl.data.compressed.HeavyCompressedDoubleArrayDataInMemoryStore;
-
-public final class StoreFactory {
-
-  private StoreFactory() {
-  }
-
-  public static NodeMeasureDataStore createDataStore(WriterCompressModel 
compressionModel) {
-    return new HeavyCompressedDoubleArrayDataInMemoryStore(compressionModel);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/AbstractBlockIndexStoreCache.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/AbstractBlockIndexStoreCache.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/AbstractBlockIndexStoreCache.java
new file mode 100644
index 0000000..cf7bf21
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/AbstractBlockIndexStoreCache.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CarbonLRUCache;
+import org.apache.carbondata.core.datastore.block.AbstractIndex;
+import org.apache.carbondata.core.datastore.block.BlockInfo;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
+import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+/**
+ * This class validate and load the B-Tree in the executor lru cache
+ * @param <K> cache key
+ * @param <V> Block Meta data details
+ */
+public abstract class AbstractBlockIndexStoreCache<K, V>
+    implements Cache<TableBlockUniqueIdentifier, AbstractIndex> {
+  /**
+   * carbon store path
+   */
+  protected String carbonStorePath;
+  /**
+   * CarbonLRU cache
+   */
+  protected CarbonLRUCache lruCache;
+
+  /**
+   * table segment id vs blockInfo list
+   */
+  protected  Map<String, List<BlockInfo>> segmentIdToBlockListMap;
+
+
+  /**
+   * map of block info to lock object map, while loading the btree this will 
be filled
+   * and removed after loading the tree for that particular block info, this 
will be useful
+   * while loading the tree concurrently so only block level lock will be 
applied another
+   * block can be loaded concurrently
+   */
+  protected Map<BlockInfo, Object> blockInfoLock;
+
+  /**
+   * The object will hold the segment ID lock so that at a time only 1 block 
that belongs to same
+   * segment & table can create the list for holding the block info
+   */
+  protected Map<String, Object> segmentIDLock;
+
+  public AbstractBlockIndexStoreCache(String carbonStorePath, CarbonLRUCache 
lruCache) {
+    this.carbonStorePath = carbonStorePath;
+    this.lruCache = lruCache;
+    blockInfoLock = new ConcurrentHashMap<BlockInfo, Object>();
+    segmentIDLock= new ConcurrentHashMap<String, Object>();
+    segmentIdToBlockListMap = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * This method will get the value for the given key. If value does not exist
+   * for the given key, it will check and load the value.
+   *
+   * @param tableBlock
+   * @param tableBlockUniqueIdentifier
+   * @param lruCacheKey
+   */
+  protected void checkAndLoadTableBlocks(AbstractIndex tableBlock,
+      TableBlockUniqueIdentifier tableBlockUniqueIdentifier, String 
lruCacheKey)
+      throws IOException {
+    // calculate the required size is
+    TableBlockInfo blockInfo = tableBlockUniqueIdentifier.getTableBlockInfo();
+    long requiredMetaSize = CarbonUtil.calculateMetaSize(blockInfo);
+    if (requiredMetaSize > 0) {
+      tableBlock.setMemorySize(requiredMetaSize);
+      tableBlock.incrementAccessCount();
+      boolean isTableBlockAddedToLruCache = lruCache.put(lruCacheKey, 
tableBlock, requiredMetaSize);
+      // if column is successfully added to lru cache then only load the
+      // table blocks data
+      if (isTableBlockAddedToLruCache) {
+        // load table blocks data
+        // getting the data file meta data of the block
+        DataFileFooter footer = CarbonUtil.readMetadatFile(blockInfo);
+        footer.setBlockInfo(new BlockInfo(blockInfo));
+        // building the block
+        tableBlock.buildIndex(Collections.singletonList(footer));
+      } else {
+        throw new IndexBuilderException(
+            "Cannot load table blocks into memory. Not enough memory 
available");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/BTreeBuilderInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/BTreeBuilderInfo.java 
b/core/src/main/java/org/apache/carbondata/core/datastore/BTreeBuilderInfo.java
new file mode 100644
index 0000000..f1b0bf7
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/BTreeBuilderInfo.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore;
+
+import java.util.List;
+
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+
+/**
+ * below class holds the meta data requires to build the blocks
+ */
+public class BTreeBuilderInfo {
+
+  /**
+   * holds all the information about data
+   * file meta data
+   */
+  private List<DataFileFooter> footerList;
+
+  /**
+   * size of the each column value size
+   * this will be useful for reading
+   */
+  private int[] dimensionColumnValueSize;
+
+  public BTreeBuilderInfo(List<DataFileFooter> footerList,
+      int[] dimensionColumnValueSize) {
+    this.dimensionColumnValueSize = dimensionColumnValueSize;
+    this.footerList = footerList;
+  }
+
+  /**
+   * @return the eachDimensionBlockSize
+   */
+  public int[] getDimensionColumnValueSize() {
+    return dimensionColumnValueSize;
+  }
+
+  /**
+   * @return the footerList
+   */
+  public List<DataFileFooter> getFooterList() {
+    return footerList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java 
b/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
new file mode 100644
index 0000000..8e93a5b
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.CarbonLRUCache;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.AbstractIndex;
+import org.apache.carbondata.core.datastore.block.BlockIndex;
+import org.apache.carbondata.core.datastore.block.BlockInfo;
+import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
+import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+/**
+ * This class is used to load the B-Tree in Executor LRU Cache
+ */
+public class BlockIndexStore<K, V> extends AbstractBlockIndexStoreCache<K, V> {
+
+  /**
+   * LOGGER instance
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(BlockIndexStore.class.getName());
+  public BlockIndexStore(String carbonStorePath, CarbonLRUCache lruCache) {
+    super(carbonStorePath, lruCache);
+  }
+
+  /**
+   * The method loads the block meta in B-tree lru cache and returns the block 
meta.
+   *
+   * @param tableBlockUniqueIdentifier Uniquely identifies the block
+   * @return returns the blocks B-Tree meta
+   */
+  @Override public AbstractIndex get(TableBlockUniqueIdentifier 
tableBlockUniqueIdentifier)
+      throws IOException {
+    TableBlockInfo tableBlockInfo = 
tableBlockUniqueIdentifier.getTableBlockInfo();
+    BlockInfo blockInfo = new BlockInfo(tableBlockInfo);
+    String lruCacheKey =
+        
getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), 
blockInfo);
+    AbstractIndex tableBlock = (AbstractIndex) lruCache.get(lruCacheKey);
+
+    // if block is not loaded
+    if (null == tableBlock) {
+      // check any lock object is present in
+      // block info lock map
+      Object blockInfoLockObject = blockInfoLock.get(blockInfo);
+      // if lock object is not present then acquire
+      // the lock in block info lock and add a lock object in the map for
+      // particular block info, added double checking mechanism to add the lock
+      // object so in case of concurrent query we for same block info only one 
lock
+      // object will be added
+      if (null == blockInfoLockObject) {
+        synchronized (blockInfoLock) {
+          // again checking the block info lock, to check whether lock object 
is present
+          // or not if now also not present then add a lock object
+          blockInfoLockObject = blockInfoLock.get(blockInfo);
+          if (null == blockInfoLockObject) {
+            blockInfoLockObject = new Object();
+            blockInfoLock.put(blockInfo, blockInfoLockObject);
+          }
+        }
+      }
+      //acquire the lock for particular block info
+      synchronized (blockInfoLockObject) {
+        // check again whether block is present or not to avoid the
+        // same block is loaded
+        //more than once in case of concurrent query
+        tableBlock = (AbstractIndex) lruCache.get(
+            
getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), 
blockInfo));
+        // if still block is not present then load the block
+        if (null == tableBlock) {
+          tableBlock = loadBlock(tableBlockUniqueIdentifier);
+          
fillSegmentIdToBlockListMap(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(),
+              blockInfo);
+        }
+      }
+    } else {
+      tableBlock.incrementAccessCount();
+    }
+    return tableBlock;
+  }
+
+  /**
+   * @param absoluteTableIdentifier
+   * @param blockInfo
+   */
+  private void fillSegmentIdToBlockListMap(AbsoluteTableIdentifier 
absoluteTableIdentifier,
+      BlockInfo blockInfo) {
+    TableSegmentUniqueIdentifier segmentIdentifier =
+        new TableSegmentUniqueIdentifier(absoluteTableIdentifier,
+            blockInfo.getTableBlockInfo().getSegmentId());
+    String uniqueTableSegmentIdentifier = 
segmentIdentifier.getUniqueTableSegmentIdentifier();
+    List<BlockInfo> blockInfos =
+        segmentIdToBlockListMap.get(uniqueTableSegmentIdentifier);
+    if (null == blockInfos) {
+      Object segmentLockObject = 
segmentIDLock.get(uniqueTableSegmentIdentifier);
+      if (null == segmentLockObject) {
+        synchronized (segmentIDLock) {
+          segmentLockObject = segmentIDLock.get(uniqueTableSegmentIdentifier);
+          if (null == segmentLockObject) {
+            segmentLockObject = new Object();
+            segmentIDLock.put(uniqueTableSegmentIdentifier, segmentLockObject);
+          }
+        }
+      }
+      synchronized (segmentLockObject) {
+        blockInfos =
+            
segmentIdToBlockListMap.get(segmentIdentifier.getUniqueTableSegmentIdentifier());
+        if (null == blockInfos) {
+          blockInfos = new CopyOnWriteArrayList<>();
+          segmentIdToBlockListMap.put(uniqueTableSegmentIdentifier, 
blockInfos);
+        }
+        blockInfos.add(blockInfo);
+      }
+    } else {
+      blockInfos.add(blockInfo);
+    }
+  }
+
+  /**
+   * The method takes list of tableblocks as input and load them in btree lru 
cache
+   * and returns the list of data blocks meta
+   *
+   * @param tableBlocksInfos List of unique table blocks
+   * @return List<AbstractIndex>
+   * @throws IndexBuilderException
+   */
+  @Override public List<AbstractIndex> getAll(List<TableBlockUniqueIdentifier> 
tableBlocksInfos)
+      throws IndexBuilderException {
+    AbstractIndex[] loadedBlock = new AbstractIndex[tableBlocksInfos.size()];
+    int numberOfCores = 1;
+    try {
+      numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.NUM_CORES,
+              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+    } catch (NumberFormatException e) {
+      numberOfCores = 
Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+    }
+    ExecutorService executor = Executors.newFixedThreadPool(numberOfCores);
+    List<Future<AbstractIndex>> blocksList = new 
ArrayList<Future<AbstractIndex>>();
+    for (TableBlockUniqueIdentifier tableBlockUniqueIdentifier : 
tableBlocksInfos) {
+      blocksList.add(executor.submit(new 
BlockLoaderThread(tableBlockUniqueIdentifier)));
+    }
+    // shutdown the executor gracefully and wait until all the task is finished
+    executor.shutdown();
+    try {
+      executor.awaitTermination(1, TimeUnit.HOURS);
+    } catch (InterruptedException e) {
+      throw new IndexBuilderException(e);
+    }
+    // fill the block which were not loaded before to loaded blocks array
+    fillLoadedBlocks(loadedBlock, blocksList);
+    return Arrays.asList(loadedBlock);
+  }
+
+  private String getLruCacheKey(AbsoluteTableIdentifier 
absoluteTableIdentifier,
+      BlockInfo blockInfo) {
+    CarbonTableIdentifier carbonTableIdentifier =
+        absoluteTableIdentifier.getCarbonTableIdentifier();
+    return carbonTableIdentifier.getDatabaseName() + 
CarbonCommonConstants.FILE_SEPARATOR
+        + carbonTableIdentifier.getTableName() + 
CarbonCommonConstants.UNDERSCORE
+        + carbonTableIdentifier.getTableId() + 
CarbonCommonConstants.FILE_SEPARATOR + blockInfo
+        .getBlockUniqueName();
+  }
+
+  /**
+   * method returns the B-Tree meta
+   *
+   * @param tableBlockUniqueIdentifier Unique table block info
+   * @return
+   */
+  @Override public AbstractIndex getIfPresent(
+      TableBlockUniqueIdentifier tableBlockUniqueIdentifier) {
+    BlockInfo blockInfo = new 
BlockInfo(tableBlockUniqueIdentifier.getTableBlockInfo());
+    BlockIndex cacheable = (BlockIndex) lruCache
+        
.get(getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), 
blockInfo));
+    if (null != cacheable) {
+      cacheable.incrementAccessCount();
+    }
+    return cacheable;
+  }
+
+  /**
+   * the method removes the entry from cache.
+   *
+   * @param tableBlockUniqueIdentifier
+   */
+  @Override public void invalidate(TableBlockUniqueIdentifier 
tableBlockUniqueIdentifier) {
+    BlockInfo blockInfo = new 
BlockInfo(tableBlockUniqueIdentifier.getTableBlockInfo());
+    lruCache
+        
.remove(getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), 
blockInfo));
+  }
+
+  @Override public void clearAccessCount(List<TableBlockUniqueIdentifier> 
keys) {
+    for (TableBlockUniqueIdentifier tableBlockUniqueIdentifier : keys) {
+      SegmentTaskIndexWrapper cacheable = (SegmentTaskIndexWrapper) lruCache
+          .get(tableBlockUniqueIdentifier.getUniqueTableBlockName());
+      cacheable.clear();
+    }
+  }
+
+  /**
+   * Below method will be used to fill the loaded blocks to the array
+   * which will be used for query execution
+   *
+   * @param loadedBlockArray array of blocks which will be filled
+   * @param blocksList       blocks loaded in thread
+   * @throws IndexBuilderException in case of any failure
+   */
+  private void fillLoadedBlocks(AbstractIndex[] loadedBlockArray,
+      List<Future<AbstractIndex>> blocksList) throws IndexBuilderException {
+    int blockCounter = 0;
+    boolean exceptionOccurred = false;
+    Throwable exceptionRef = null;
+    for (int i = 0; i < loadedBlockArray.length; i++) {
+      try {
+        loadedBlockArray[i] = blocksList.get(blockCounter++).get();
+      } catch (Throwable e) {
+        exceptionOccurred = true;
+        exceptionRef = e;
+      }
+    }
+    if (exceptionOccurred) {
+      LOGGER.error("Block B-tree loading failed. Clearing the access count of 
the loaded blocks.");
+      // in case of any failure clear the access count for the valid loaded 
blocks
+      clearAccessCountForLoadedBlocks(loadedBlockArray);
+      throw new IndexBuilderException("Block B-tree loading failed", 
exceptionRef);
+    }
+  }
+
+  /**
+   * This method will clear the access count for the loaded blocks
+   *
+   * @param loadedBlockArray
+   */
+  private void clearAccessCountForLoadedBlocks(AbstractIndex[] 
loadedBlockArray) {
+    for (int i = 0; i < loadedBlockArray.length; i++) {
+      if (null != loadedBlockArray[i]) {
+        loadedBlockArray[i].clear();
+      }
+    }
+  }
+
+  /**
+   * Thread class which will be used to load the blocks
+   */
+  private class BlockLoaderThread implements Callable<AbstractIndex> {
+    // table  block unique identifier
+    private TableBlockUniqueIdentifier tableBlockUniqueIdentifier;
+
+    private BlockLoaderThread(TableBlockUniqueIdentifier 
tableBlockUniqueIdentifier) {
+      this.tableBlockUniqueIdentifier = tableBlockUniqueIdentifier;
+    }
+
+    @Override public AbstractIndex call() throws Exception {
+      // load and return the loaded blocks
+      return get(tableBlockUniqueIdentifier);
+    }
+  }
+
+  private AbstractIndex loadBlock(TableBlockUniqueIdentifier 
tableBlockUniqueIdentifier)
+      throws IOException {
+    AbstractIndex tableBlock = new BlockIndex();
+    BlockInfo blockInfo = new 
BlockInfo(tableBlockUniqueIdentifier.getTableBlockInfo());
+    String lruCacheKey =
+        
getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), 
blockInfo);
+    checkAndLoadTableBlocks(tableBlock, tableBlockUniqueIdentifier, 
lruCacheKey);
+    // finally remove the lock object from block info lock as once block is 
loaded
+    // it will not come inside this if condition
+    blockInfoLock.remove(blockInfo);
+    return tableBlock;
+  }
+
+  /**
+   * This will be used to remove a particular blocks useful in case of
+   * deletion of some of the blocks in case of retention or may be some other
+   * scenario
+   *
+   * @param segmentIds              list of table blocks to be removed
+   * @param absoluteTableIdentifier absolute table identifier
+   */
+  public void removeTableBlocks(List<String> segmentIds,
+      AbsoluteTableIdentifier absoluteTableIdentifier) {
+    if (null == segmentIds) {
+      return;
+    }
+    for (String segmentId : segmentIds) {
+      TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
+          new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
+      List<BlockInfo> blockInfos = segmentIdToBlockListMap
+          
.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+      if (null != blockInfos) {
+        for (BlockInfo blockInfo : blockInfos) {
+          String lruCacheKey = getLruCacheKey(absoluteTableIdentifier, 
blockInfo);
+          lruCache.remove(lruCacheKey);
+        }
+      }
+    }
+  }
+
+  /**
+   * remove TableBlocks executer level If Horizontal Compaction Done
+   * @param queryModel
+   */
+  public void removeTableBlocksIfHorizontalCompactionDone(QueryModel 
queryModel) {
+    // get the invalid segments blocks details
+    Map<String, UpdateVO> invalidBlocksVO = 
queryModel.getInvalidBlockVOForSegmentId();
+    if (!invalidBlocksVO.isEmpty()) {
+      UpdateVO updateMetadata;
+      Iterator<Map.Entry<String, UpdateVO>> itr = 
invalidBlocksVO.entrySet().iterator();
+      String blockTimestamp = null;
+      while (itr.hasNext()) {
+        Map.Entry<String, UpdateVO> entry = itr.next();
+        TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
+            new 
TableSegmentUniqueIdentifier(queryModel.getAbsoluteTableIdentifier(),
+                entry.getKey());
+        List<BlockInfo> blockInfos = segmentIdToBlockListMap
+            
.get(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+        if (null != blockInfos) {
+          for (BlockInfo blockInfo : blockInfos) {
+            // reading the updated block names from status manager instance
+            blockTimestamp = blockInfo.getBlockUniqueName()
+                .substring(blockInfo.getBlockUniqueName().lastIndexOf('-') + 1,
+                    blockInfo.getBlockUniqueName().length());
+            updateMetadata = entry.getValue();
+            if 
(CarbonUpdateUtil.isMaxQueryTimeoutExceeded(Long.parseLong(blockTimestamp))) {
+              Long blockTimeStamp = Long.parseLong(blockTimestamp);
+              if (blockTimeStamp > updateMetadata.getFactTimestamp() && (
+                  updateMetadata.getUpdateDeltaStartTimestamp() != null
+                      && blockTimeStamp < 
updateMetadata.getUpdateDeltaStartTimestamp())) {
+                String lruCacheKey =
+                    getLruCacheKey(queryModel.getAbsoluteTableIdentifier(), 
blockInfo);
+                lruCache.remove(lruCacheKey);
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+}


Reply via email to