http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/AlluxioCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/AlluxioCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/AlluxioCarbonFile.java deleted file mode 100644 index 9ff0f59..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/AlluxioCarbonFile.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.core.datastorage.store.filesystem; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.datastorage.store.impl.FileFactory; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; - - - -public class AlluxioCarbonFile extends AbstractDFSCarbonFile { - /** - * LOGGER - */ - private static final LogService LOGGER = - LogServiceFactory.getLogService(AlluxioCarbonFile.class.getName()); - - public AlluxioCarbonFile(String filePath) { - super(filePath); - } - - public AlluxioCarbonFile(Path path) { - super(path); - } - - public AlluxioCarbonFile(FileStatus fileStatus) { - super(fileStatus); - } - - /** - * @param listStatus - * @return - */ - private CarbonFile[] getFiles(FileStatus[] listStatus) { - if (listStatus == null) { - return new CarbonFile[0]; - } - CarbonFile[] files = new CarbonFile[listStatus.length]; - for (int i = 0; i < files.length; i++) { - files[i] = new AlluxioCarbonFile(listStatus[i]); - } - return files; - } - - @Override - public CarbonFile[] listFiles() { - FileStatus[] listStatus = null; - try { - if (null != fileStatus && fileStatus.isDirectory()) { - Path path = fileStatus.getPath(); - listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path); - } else { - return null; - } - } catch (IOException e) { - LOGGER.error("Exception occured: " + e.getMessage()); - return new CarbonFile[0]; - } - return getFiles(listStatus); - } - - @Override - public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) { - CarbonFile[] files = listFiles(); - if (files != null && files.length >= 1) { - List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length); - for (int i = 0; i < files.length; i++) { - if (fileFilter.accept(files[i])) { - fileList.add(files[i]); - } - } - if (fileList.size() >= 1) { - return fileList.toArray(new CarbonFile[fileList.size()]); - } else { - return new CarbonFile[0]; - } - } - return files; - } - - @Override - public CarbonFile getParentFile() { - Path parent = fileStatus.getPath().getParent(); - return null == parent ? null : new AlluxioCarbonFile(parent); - } - - @Override - public boolean renameForce(String changetoName) { - FileSystem fs; - try { - fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration()); - if (fs instanceof DistributedFileSystem) { - ((DistributedFileSystem) fs).rename(fileStatus.getPath(), new Path(changetoName), - org.apache.hadoop.fs.Options.Rename.OVERWRITE); - return true; - } else { - return false; - } - } catch (IOException e) { - LOGGER.error("Exception occured: " + e.getMessage()); - return false; - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/CarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/CarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/CarbonFile.java deleted file mode 100644 index 642055b..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/CarbonFile.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.core.datastorage.store.filesystem; - -public interface CarbonFile { - - String getAbsolutePath(); - - CarbonFile[] listFiles(CarbonFileFilter fileFilter); - - CarbonFile[] listFiles(); - - String getName(); - - boolean isDirectory(); - - boolean exists(); - - String getCanonicalPath(); - - CarbonFile getParentFile(); - - String getPath(); - - long getSize(); - - boolean renameTo(String changetoName); - - boolean renameForce(String changetoName); - - boolean delete(); - - boolean createNewFile(); - - long getLastModifiedTime(); - - boolean setLastModifiedTime(long timestamp); - - boolean truncate(String fileName, long validDataEndOffset); - - /** - * This method will be used to check whether a file has been modified or not - * - * @param fileTimeStamp time to be compared with latest timestamp of file - * @param endOffset file length to be compared with current length of file - * @return - */ - boolean isFileModified(long fileTimeStamp, long endOffset); -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/CarbonFileFilter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/CarbonFileFilter.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/CarbonFileFilter.java deleted file mode 100644 index 7db3b2b..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/CarbonFileFilter.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.core.datastorage.store.filesystem; - -public interface CarbonFileFilter { - boolean accept(CarbonFile file); -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java deleted file mode 100644 index ebe18e4..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/HDFSCarbonFile.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.core.datastorage.store.filesystem; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.datastorage.store.impl.FileFactory; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; - -public class HDFSCarbonFile extends AbstractDFSCarbonFile { - /** - * LOGGER - */ - private static final LogService LOGGER = - LogServiceFactory.getLogService(HDFSCarbonFile.class.getName()); - - public HDFSCarbonFile(String filePath) { - super(filePath); - } - - public HDFSCarbonFile(Path path) { - super(path); - } - - public HDFSCarbonFile(FileStatus fileStatus) { - super(fileStatus); - } - - /** - * @param listStatus - * @return - */ - private CarbonFile[] getFiles(FileStatus[] listStatus) { - if (listStatus == null) { - return new CarbonFile[0]; - } - CarbonFile[] files = new CarbonFile[listStatus.length]; - for (int i = 0; i < files.length; i++) { - files[i] = new HDFSCarbonFile(listStatus[i]); - } - return files; - } - - @Override - public CarbonFile[] listFiles() { - FileStatus[] listStatus = null; - try { - if (null != fileStatus && fileStatus.isDirectory()) { - Path path = fileStatus.getPath(); - listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path); - } else { - return null; - } - } catch (IOException e) { - LOGGER.error("Exception occured: " + e.getMessage()); - return new CarbonFile[0]; - } - return getFiles(listStatus); - } - - @Override - public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) { - CarbonFile[] files = listFiles(); - if (files != null && files.length >= 1) { - List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length); - for (int i = 0; i < files.length; i++) { - if (fileFilter.accept(files[i])) { - fileList.add(files[i]); - } - } - if (fileList.size() >= 1) { - return fileList.toArray(new CarbonFile[fileList.size()]); - } else { - return new CarbonFile[0]; - } - } - return files; - } - - @Override - public CarbonFile getParentFile() { - Path parent = fileStatus.getPath().getParent(); - return null == parent ? null : new HDFSCarbonFile(parent); - } - - @Override - public boolean renameForce(String changetoName) { - FileSystem fs; - try { - fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration()); - if (fs instanceof DistributedFileSystem) { - ((DistributedFileSystem) fs).rename(fileStatus.getPath(), new Path(changetoName), - org.apache.hadoop.fs.Options.Rename.OVERWRITE); - return true; - } else { - return false; - } - } catch (IOException e) { - LOGGER.error("Exception occured: " + e.getMessage()); - return false; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/LocalCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/LocalCarbonFile.java deleted file mode 100644 index 406f6d1..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/LocalCarbonFile.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.core.datastorage.store.filesystem; - -import java.io.File; -import java.io.FileFilter; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.channels.FileChannel; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastorage.store.impl.FileFactory; -import org.apache.carbondata.core.util.CarbonUtil; - -import org.apache.hadoop.fs.Path; - -public class LocalCarbonFile implements CarbonFile { - private static final LogService LOGGER = - LogServiceFactory.getLogService(LocalCarbonFile.class.getName()); - private File file; - - public LocalCarbonFile(String filePath) { - Path pathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(new Path(filePath)); - file = new File(pathWithoutSchemeAndAuthority.toString()); - } - - public LocalCarbonFile(File file) { - this.file = file; - } - - @Override public String getAbsolutePath() { - return file.getAbsolutePath(); - } - - @Override public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) { - if (!file.isDirectory()) { - return null; - } - - File[] files = file.listFiles(new FileFilter() { - - @Override public boolean accept(File pathname) { - return fileFilter.accept(new LocalCarbonFile(pathname)); - } - }); - - if (files == null) { - return new CarbonFile[0]; - } - - CarbonFile[] carbonFiles = new CarbonFile[files.length]; - - for (int i = 0; i < carbonFiles.length; i++) { - carbonFiles[i] = new LocalCarbonFile(files[i]); - } - - return carbonFiles; - } - - @Override public String getName() { - return file.getName(); - } - - @Override public boolean isDirectory() { - return file.isDirectory(); - } - - @Override public boolean exists() { - if (file != null) { - return file.exists(); - } - return false; - } - - @Override public String getCanonicalPath() { - try { - return file.getCanonicalPath(); - } catch (IOException e) { - LOGGER - .error(e, "Exception occured" + e.getMessage()); - } - return null; - } - - @Override public CarbonFile getParentFile() { - return new LocalCarbonFile(file.getParentFile()); - } - - @Override public String getPath() { - return file.getPath(); - } - - @Override public long getSize() { - return file.length(); - } - - public boolean renameTo(String changetoName) { - return file.renameTo(new File(changetoName)); - } - - public boolean delete() { - return file.delete(); - } - - @Override public CarbonFile[] listFiles() { - - if (!file.isDirectory()) { - return null; - } - File[] files = file.listFiles(); - if (files == null) { - return new CarbonFile[0]; - } - CarbonFile[] carbonFiles = new CarbonFile[files.length]; - for (int i = 0; i < carbonFiles.length; i++) { - carbonFiles[i] = new LocalCarbonFile(files[i]); - } - - return carbonFiles; - - } - - @Override public boolean createNewFile() { - try { - return file.createNewFile(); - } catch (IOException e) { - return false; - } - } - - @Override public long getLastModifiedTime() { - return file.lastModified(); - } - - @Override public boolean setLastModifiedTime(long timestamp) { - return file.setLastModified(timestamp); - } - - /** - * This method will delete the data in file data from a given offset - */ - @Override public boolean truncate(String fileName, long validDataEndOffset) { - FileChannel source = null; - FileChannel destination = null; - boolean fileTruncatedSuccessfully = false; - // temporary file name - String tempWriteFilePath = fileName + CarbonCommonConstants.TEMPWRITEFILEEXTENSION; - FileFactory.FileType fileType = FileFactory.getFileType(fileName); - try { - CarbonFile tempFile = null; - // delete temporary file if it already exists at a given path - if (FileFactory.isFileExist(tempWriteFilePath, fileType)) { - tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType); - tempFile.delete(); - } - // create new temporary file - FileFactory.createNewFile(tempWriteFilePath, fileType); - tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType); - source = new FileInputStream(fileName).getChannel(); - destination = new FileOutputStream(tempWriteFilePath).getChannel(); - long read = destination.transferFrom(source, 0, validDataEndOffset); - long totalBytesRead = read; - long remaining = validDataEndOffset - totalBytesRead; - // read till required data offset is not reached - while (remaining > 0) { - read = destination.transferFrom(source, totalBytesRead, remaining); - totalBytesRead = totalBytesRead + read; - remaining = remaining - totalBytesRead; - } - CarbonUtil.closeStreams(source, destination); - // rename the temp file to original file - tempFile.renameForce(fileName); - fileTruncatedSuccessfully = true; - } catch (IOException e) { - LOGGER.error("Exception occured while truncating the file " + e.getMessage()); - } finally { - CarbonUtil.closeStreams(source, destination); - } - return fileTruncatedSuccessfully; - } - - /** - * This method will be used to check whether a file has been modified or not - * - * @param fileTimeStamp time to be compared with latest timestamp of file - * @param endOffset file length to be compared with current length of file - * @return - */ - @Override public boolean isFileModified(long fileTimeStamp, long endOffset) { - boolean isFileModified = false; - if (getLastModifiedTime() > fileTimeStamp || getSize() > endOffset) { - isFileModified = true; - } - return isFileModified; - } - - @Override public boolean renameForce(String changetoName) { - File destFile = new File(changetoName); - if (destFile.exists()) { - if (destFile.delete()) { - return file.renameTo(new File(changetoName)); - } - } - - return file.renameTo(new File(changetoName)); - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/ViewFSCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/ViewFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/ViewFSCarbonFile.java deleted file mode 100644 index 8f11b7a..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/store/filesystem/ViewFSCarbonFile.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.carbondata.core.datastorage.store.filesystem; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.datastorage.store.impl.FileFactory; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.viewfs.ViewFileSystem; - -public class ViewFSCarbonFile extends AbstractDFSCarbonFile { - /** - * LOGGER - */ - private static final LogService LOGGER = - LogServiceFactory.getLogService(ViewFSCarbonFile.class.getName()); - - public ViewFSCarbonFile(String filePath) { - super(filePath); - } - - public ViewFSCarbonFile(Path path) { - super(path); - } - - public ViewFSCarbonFile(FileStatus fileStatus) { - super(fileStatus); - } - - /** - * @param listStatus - * @return - */ - private CarbonFile[] getFiles(FileStatus[] listStatus) { - if (listStatus == null) { - return new CarbonFile[0]; - } - CarbonFile[] files = new CarbonFile[listStatus.length]; - for (int i = 0; i < files.length; i++) { - files[i] = new ViewFSCarbonFile(listStatus[i]); - } - return files; - } - - @Override - public CarbonFile[] listFiles() { - FileStatus[] listStatus = null; - try { - if (null != fileStatus && fileStatus.isDirectory()) { - Path path = fileStatus.getPath(); - listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path); - } else { - return null; - } - } catch (IOException ex) { - LOGGER.error("Exception occured" + ex.getMessage()); - return new CarbonFile[0]; - } - return getFiles(listStatus); - } - - @Override - public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) { - CarbonFile[] files = listFiles(); - if (files != null && files.length >= 1) { - List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length); - for (int i = 0; i < files.length; i++) { - if (fileFilter.accept(files[i])) { - fileList.add(files[i]); - } - } - if (fileList.size() >= 1) { - return fileList.toArray(new CarbonFile[fileList.size()]); - } else { - return new CarbonFile[0]; - } - } - return files; - } - - @Override public CarbonFile getParentFile() { - Path parent = fileStatus.getPath().getParent(); - return null == parent ? null : new ViewFSCarbonFile(parent); - } - - @Override - public boolean renameForce(String changetoName) { - FileSystem fs; - try { - fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration()); - if (fs instanceof ViewFileSystem) { - fs.delete(new Path(changetoName), true); - fs.rename(fileStatus.getPath(), new Path(changetoName)); - return true; - } else { - return false; - } - } catch (IOException e) { - LOGGER.error("Exception occured" + e.getMessage()); - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/CompressedDataMeasureDataWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/CompressedDataMeasureDataWrapper.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/CompressedDataMeasureDataWrapper.java deleted file mode 100644 index c9571b2..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/CompressedDataMeasureDataWrapper.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.core.datastorage.store.impl; - -import org.apache.carbondata.core.datastorage.store.MeasureDataWrapper; -import org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder; - -public class CompressedDataMeasureDataWrapper implements MeasureDataWrapper { - - private final CarbonReadDataHolder[] values; - - public CompressedDataMeasureDataWrapper(final CarbonReadDataHolder[] values) { - this.values = values; - } - - @Override public CarbonReadDataHolder[] getValues() { - return values; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java deleted file mode 100644 index 6e2f5d7..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/DFSFileHolderImpl.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.carbondata.core.datastorage.store.impl; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastorage.store.FileHolder; - -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - - -public class DFSFileHolderImpl implements FileHolder { - /** - * cache to hold filename and its stream - */ - private Map<String, FSDataInputStream> fileNameAndStreamCache; - - public DFSFileHolderImpl() { - this.fileNameAndStreamCache = - new HashMap<String, FSDataInputStream>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - } - - @Override public byte[] readByteArray(String filePath, long offset, int length) - throws IOException { - FSDataInputStream fileChannel = updateCache(filePath); - return read(fileChannel, length, offset); - } - - /** - * This method will be used to check whether stream is already present in - * cache or not for filepath if not present then create it and then add to - * cache, other wise get from cache - * - * @param filePath fully qualified file path - * @return channel - */ - private FSDataInputStream updateCache(String filePath) throws IOException { - FSDataInputStream fileChannel = fileNameAndStreamCache.get(filePath); - if (null == fileChannel) { - Path pt = new Path(filePath); - FileSystem fs = FileSystem.get(FileFactory.getConfiguration()); - fileChannel = fs.open(pt); - fileNameAndStreamCache.put(filePath, fileChannel); - } - return fileChannel; - } - - /** - * This method will be used to read from file based on number of bytes to be read and positon - * - * @param channel file channel - * @param size number of bytes - * @param offset position - * @return byte buffer - */ - private byte[] read(FSDataInputStream channel, int size, long offset) throws IOException { - byte[] byteBffer = new byte[size]; - channel.seek(offset); - channel.readFully(byteBffer); - return byteBffer; - } - - /** - * This method will be used to read from file based on number of bytes to be read and positon - * - * @param channel file channel - * @param size number of bytes - * @return byte buffer - */ - private byte[] read(FSDataInputStream channel, int size) throws IOException { - byte[] byteBffer = new byte[size]; - channel.readFully(byteBffer); - return byteBffer; - } - - @Override public int readInt(String filePath, long offset) throws IOException { - FSDataInputStream fileChannel = updateCache(filePath); - fileChannel.seek(offset); - return fileChannel.readInt(); - } - - @Override public long readDouble(String filePath, long offset) throws IOException { - FSDataInputStream fileChannel = updateCache(filePath); - fileChannel.seek(offset); - return fileChannel.readLong(); - } - - @Override public void finish() throws IOException { - for (Entry<String, FSDataInputStream> entry : fileNameAndStreamCache.entrySet()) { - FSDataInputStream channel = entry.getValue(); - if (null != channel) { - channel.close(); - } - } - } - - @Override public byte[] readByteArray(String filePath, int length) throws IOException { - FSDataInputStream fileChannel = updateCache(filePath); - return read(fileChannel, length); - } - - @Override public long readLong(String filePath, long offset) throws IOException { - FSDataInputStream fileChannel = updateCache(filePath); - fileChannel.seek(offset); - return fileChannel.readLong(); - } - - @Override public int readInt(String filePath) throws IOException { - FSDataInputStream fileChannel = updateCache(filePath); - return fileChannel.readInt(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java deleted file mode 100644 index f0c424b..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java +++ /dev/null @@ -1,485 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.core.datastorage.store.impl; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.zip.GZIPInputStream; - -import org.apache.carbondata.core.datastorage.store.FileHolder; -import org.apache.carbondata.core.datastorage.store.filesystem.AlluxioCarbonFile; -import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile; -import org.apache.carbondata.core.datastorage.store.filesystem.HDFSCarbonFile; -import org.apache.carbondata.core.datastorage.store.filesystem.LocalCarbonFile; -import org.apache.carbondata.core.datastorage.store.filesystem.ViewFSCarbonFile; -import org.apache.carbondata.core.util.CarbonUtil; - -import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.compress.BZip2Codec; -import org.apache.hadoop.io.compress.GzipCodec; - -public final class FileFactory { - private static Configuration configuration = null; - - static { - configuration = new Configuration(); - configuration.addResource(new Path("../core-default.xml")); - } - - private FileFactory() { - - } - - public static Configuration getConfiguration() { - return configuration; - } - - public static FileHolder getFileHolder(FileType fileType) { - switch (fileType) { - case LOCAL: - return new FileHolderImpl(); - case HDFS: - case ALLUXIO: - case VIEWFS: - return new DFSFileHolderImpl(); - default: - return new FileHolderImpl(); - } - } - - public static FileType getFileType(String path) { - if (path.startsWith(CarbonUtil.HDFS_PREFIX)) { - return FileType.HDFS; - } - else if (path.startsWith(CarbonUtil.ALLUXIO_PREFIX)) { - return FileType.ALLUXIO; - } - else if (path.startsWith(CarbonUtil.VIEWFS_PREFIX)) { - return FileType.VIEWFS; - } - return FileType.LOCAL; - } - - public static CarbonFile getCarbonFile(String path, FileType fileType) { - switch (fileType) { - case LOCAL: - return new LocalCarbonFile(path); - case HDFS: - return new HDFSCarbonFile(path); - case ALLUXIO: - return new AlluxioCarbonFile(path); - case VIEWFS: - return new ViewFSCarbonFile(path); - default: - return new LocalCarbonFile(path); - } - } - - public static DataInputStream getDataInputStream(String path, FileType fileType) - throws IOException { - return getDataInputStream(path, fileType, -1); - } - - public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize) - throws IOException { - path = path.replace("\\", "/"); - boolean gzip = path.endsWith(".gz"); - boolean bzip2 = path.endsWith(".bz2"); - InputStream stream; - switch (fileType) { - case LOCAL: - if (gzip) { - stream = new GZIPInputStream(new FileInputStream(path)); - } else if (bzip2) { - stream = new BZip2CompressorInputStream(new FileInputStream(path)); - } else { - stream = new FileInputStream(path); - } - break; - case HDFS: - case ALLUXIO: - case VIEWFS: - Path pt = new Path(path); - FileSystem fs = pt.getFileSystem(configuration); - if (bufferSize == -1) { - stream = fs.open(pt); - } else { - stream = fs.open(pt, bufferSize); - } - if (gzip) { - GzipCodec codec = new GzipCodec(); - stream = codec.createInputStream(stream); - } else if (bzip2) { - BZip2Codec codec = new BZip2Codec(); - stream = codec.createInputStream(stream); - } - break; - default: - throw new UnsupportedOperationException("unsupported file system"); - } - return new DataInputStream(new BufferedInputStream(stream)); - } - - /** - * return the datainputStream which is seek to the offset of file - * - * @param path - * @param fileType - * @param bufferSize - * @param offset - * @return DataInputStream - * @throws IOException - */ - public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize, - long offset) throws IOException { - path = path.replace("\\", "/"); - switch (fileType) { - case HDFS: - case ALLUXIO: - case VIEWFS: - Path pt = new Path(path); - FileSystem fs = pt.getFileSystem(configuration); - FSDataInputStream stream = fs.open(pt, bufferSize); - stream.seek(offset); - return new DataInputStream(new BufferedInputStream(stream)); - default: - FileInputStream fis = new FileInputStream(path); - long actualSkipSize = 0; - long skipSize = offset; - while (actualSkipSize != offset) { - actualSkipSize += fis.skip(skipSize); - skipSize = skipSize - actualSkipSize; - } - return new DataInputStream(new BufferedInputStream(fis)); - } - } - - public static DataOutputStream getDataOutputStream(String path, FileType fileType) - throws IOException { - path = path.replace("\\", "/"); - switch (fileType) { - case LOCAL: - return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path))); - case HDFS: - case ALLUXIO: - case VIEWFS: - Path pt = new Path(path); - FileSystem fs = pt.getFileSystem(configuration); - FSDataOutputStream stream = fs.create(pt, true); - return stream; - default: - return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path))); - } - } - - public static DataOutputStream getDataOutputStream(String path, FileType fileType, int bufferSize, - boolean append) throws IOException { - path = path.replace("\\", "/"); - switch (fileType) { - case LOCAL: - return new DataOutputStream( - new BufferedOutputStream(new FileOutputStream(path, append), bufferSize)); - case HDFS: - case ALLUXIO: - case VIEWFS: - Path pt = new Path(path); - FileSystem fs = pt.getFileSystem(configuration); - FSDataOutputStream stream = null; - if (append) { - // append to a file only if file already exists else file not found - // exception will be thrown by hdfs - if (CarbonUtil.isFileExists(path)) { - stream = fs.append(pt, bufferSize); - } else { - stream = fs.create(pt, true, bufferSize); - } - } else { - stream = fs.create(pt, true, bufferSize); - } - return stream; - default: - return new DataOutputStream( - new BufferedOutputStream(new FileOutputStream(path), bufferSize)); - } - } - - public static DataOutputStream getDataOutputStream(String path, FileType fileType, int bufferSize, - long blockSize) throws IOException { - path = path.replace("\\", "/"); - switch (fileType) { - case LOCAL: - return new DataOutputStream( - new BufferedOutputStream(new FileOutputStream(path), bufferSize)); - case HDFS: - case ALLUXIO: - case VIEWFS: - Path pt = new Path(path); - FileSystem fs = pt.getFileSystem(configuration); - FSDataOutputStream stream = - fs.create(pt, true, bufferSize, fs.getDefaultReplication(pt), blockSize); - return stream; - default: - return new DataOutputStream( - new BufferedOutputStream(new FileOutputStream(path), bufferSize)); - } - } - - /** - * This method checks the given path exists or not and also is it file or - * not if the performFileCheck is true - * - * @param filePath - Path - * @param fileType - FileType Local/HDFS - * @param performFileCheck - Provide false for folders, true for files and - */ - public static boolean isFileExist(String filePath, FileType fileType, boolean performFileCheck) - throws IOException { - filePath = filePath.replace("\\", "/"); - switch (fileType) { - case HDFS: - case ALLUXIO: - case VIEWFS: - Path path = new Path(filePath); - FileSystem fs = path.getFileSystem(configuration); - if (performFileCheck) { - return fs.exists(path) && fs.isFile(path); - } else { - return fs.exists(path); - } - - case LOCAL: - default: - File defaultFile = new File(filePath); - - if (performFileCheck) { - return defaultFile.exists() && defaultFile.isFile(); - } else { - return defaultFile.exists(); - } - } - } - - /** - * This method checks the given path exists or not and also is it file or - * not if the performFileCheck is true - * - * @param filePath - Path - * @param fileType - FileType Local/HDFS - */ - public static boolean isFileExist(String filePath, FileType fileType) throws IOException { - filePath = filePath.replace("\\", "/"); - switch (fileType) { - case HDFS: - case ALLUXIO: - case VIEWFS: - Path path = new Path(filePath); - FileSystem fs = path.getFileSystem(configuration); - return fs.exists(path); - - case LOCAL: - default: - File defaultFile = new File(filePath); - return defaultFile.exists(); - } - } - - public static boolean createNewFile(String filePath, FileType fileType) throws IOException { - filePath = filePath.replace("\\", "/"); - switch (fileType) { - case HDFS: - case ALLUXIO: - case VIEWFS: - Path path = new Path(filePath); - FileSystem fs = path.getFileSystem(configuration); - return fs.createNewFile(path); - - case LOCAL: - default: - File file = new File(filePath); - return file.createNewFile(); - } - } - - public static boolean deleteFile(String filePath, FileType fileType) throws IOException { - filePath = filePath.replace("\\", "/"); - switch (fileType) { - case HDFS: - case ALLUXIO: - case VIEWFS: - Path path = new Path(filePath); - FileSystem fs = path.getFileSystem(configuration); - return fs.delete(path, true); - - case LOCAL: - default: - File file = new File(filePath); - return deleteAllFilesOfDir(file); - } - } - - public static boolean deleteAllFilesOfDir(File path) { - if (!path.exists()) { - return true; - } - if (path.isFile()) { - return path.delete(); - } - File[] files = path.listFiles(); - for (int i = 0; i < files.length; i++) { - deleteAllFilesOfDir(files[i]); - } - return path.delete(); - } - - public static boolean mkdirs(String filePath, FileType fileType) throws IOException { - filePath = filePath.replace("\\", "/"); - switch (fileType) { - case HDFS: - case ALLUXIO: - case VIEWFS: - Path path = new Path(filePath); - FileSystem fs = path.getFileSystem(configuration); - return fs.mkdirs(path); - case LOCAL: - default: - File file = new File(filePath); - return file.mkdirs(); - } - } - - /** - * for getting the dataoutput stream using the hdfs filesystem append API. - * - * @param path - * @param fileType - * @return - * @throws IOException - */ - public static DataOutputStream getDataOutputStreamUsingAppend(String path, FileType fileType) - throws IOException { - path = path.replace("\\", "/"); - switch (fileType) { - case LOCAL: - return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path, true))); - case HDFS: - case ALLUXIO: - case VIEWFS: - Path pt = new Path(path); - FileSystem fs = pt.getFileSystem(configuration); - FSDataOutputStream stream = fs.append(pt); - return stream; - default: - return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path))); - } - } - - /** - * for creating a new Lock file and if it is successfully created - * then in case of abrupt shutdown then the stream to that file will be closed. - * - * @param filePath - * @param fileType - * @return - * @throws IOException - */ - public static boolean createNewLockFile(String filePath, FileType fileType) throws IOException { - filePath = filePath.replace("\\", "/"); - switch (fileType) { - case HDFS: - case ALLUXIO: - case VIEWFS: - Path path = new Path(filePath); - FileSystem fs = path.getFileSystem(configuration); - if (fs.createNewFile(path)) { - fs.deleteOnExit(path); - return true; - } - return false; - case LOCAL: - default: - File file = new File(filePath); - return file.createNewFile(); - } - } - - public enum FileType { - LOCAL, HDFS, ALLUXIO, VIEWFS - } - - /** - * below method will be used to update the file path - * for local type - * it removes the file:/ from the path - * - * @param filePath - * @return updated file path without url for local - */ - public static String getUpdatedFilePath(String filePath) { - FileType fileType = getFileType(filePath); - switch (fileType) { - case HDFS: - case ALLUXIO: - case VIEWFS: - return filePath; - case LOCAL: - default: - Path pathWithoutSchemeAndAuthority = - Path.getPathWithoutSchemeAndAuthority(new Path(filePath)); - return pathWithoutSchemeAndAuthority.toString(); - } - } - - /** - * It computes size of directory - * - * @param filePath - * @return size in bytes - * @throws IOException - */ - public static long getDirectorySize(String filePath) throws IOException { - FileType fileType = getFileType(filePath); - switch (fileType) { - case HDFS: - case ALLUXIO: - case VIEWFS: - Path path = new Path(filePath); - FileSystem fs = path.getFileSystem(configuration); - return fs.getContentSummary(path).getLength(); - case LOCAL: - default: - File file = new File(filePath); - return FileUtils.sizeOfDirectory(file); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileHolderImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileHolderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileHolderImpl.java deleted file mode 100644 index 9a5c06a..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileHolderImpl.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.core.datastorage.store.impl; - -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastorage.store.FileHolder; - -public class FileHolderImpl implements FileHolder { - /** - * cache to hold filename and its stream - */ - private Map<String, FileChannel> fileNameAndStreamCache; - - /** - * FileHolderImpl Constructor - * It will create the cache - */ - public FileHolderImpl() { - this.fileNameAndStreamCache = - new HashMap<String, FileChannel>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - } - - public FileHolderImpl(int capacity) { - this.fileNameAndStreamCache = new HashMap<String, FileChannel>(capacity); - } - - /** - * This method will be used to read the byte array from file based on offset - * and length(number of bytes) need to read - * - * @param filePath fully qualified file path - * @param offset reading start position, - * @param length number of bytes to be read - * @return read byte array - */ - @Override public byte[] readByteArray(String filePath, long offset, int length) - throws IOException { - FileChannel fileChannel = updateCache(filePath); - ByteBuffer byteBffer = read(fileChannel, length, offset); - return byteBffer.array(); - } - - /** - * This method will be used to close all the streams currently present in the cache - */ - @Override public void finish() throws IOException { - for (Entry<String, FileChannel> entry : fileNameAndStreamCache.entrySet()) { - FileChannel channel = entry.getValue(); - if (null != channel) { - channel.close(); - } - } - } - - /** - * This method will be used to read int from file from postion(offset), here - * length will be always 4 bacause int byte size if 4 - * - * @param filePath fully qualified file path - * @param offset reading start position, - * @return read int - */ - @Override public int readInt(String filePath, long offset) throws IOException { - FileChannel fileChannel = updateCache(filePath); - ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.INT_SIZE_IN_BYTE, offset); - return byteBffer.getInt(); - } - - /** - * This method will be used to read int from file from postion(offset), here - * length will be always 4 bacause int byte size if 4 - * - * @param filePath fully qualified file path - * @return read int - */ - @Override public int readInt(String filePath) throws IOException { - FileChannel fileChannel = updateCache(filePath); - ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.INT_SIZE_IN_BYTE); - return byteBffer.getInt(); - } - - /** - * This method will be used to read int from file from postion(offset), here - * length will be always 4 bacause int byte size if 4 - * - * @param filePath fully qualified file path - * @param offset reading start position, - * @return read int - */ - @Override public long readDouble(String filePath, long offset) throws IOException { - FileChannel fileChannel = updateCache(filePath); - ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.LONG_SIZE_IN_BYTE, offset); - return byteBffer.getLong(); - } - - /** - * This method will be used to check whether stream is already present in - * cache or not for filepath if not present then create it and then add to - * cache, other wise get from cache - * - * @param filePath fully qualified file path - * @return channel - */ - private FileChannel updateCache(String filePath) throws FileNotFoundException { - FileChannel fileChannel = fileNameAndStreamCache.get(filePath); - if (null == fileChannel) { - FileInputStream stream = new FileInputStream(filePath); - fileChannel = stream.getChannel(); - fileNameAndStreamCache.put(filePath, fileChannel); - } - return fileChannel; - } - - /** - * This method will be used to read from file based on number of bytes to be read and positon - * - * @param channel file channel - * @param size number of bytes - * @param offset position - * @return byte buffer - */ - private ByteBuffer read(FileChannel channel, int size, long offset) throws IOException { - ByteBuffer byteBffer = ByteBuffer.allocate(size); - channel.position(offset); - channel.read(byteBffer); - byteBffer.rewind(); - return byteBffer; - } - - /** - * This method will be used to read from file based on number of bytes to be read and positon - * - * @param channel file channel - * @param size number of bytes - * @return byte buffer - */ - private ByteBuffer read(FileChannel channel, int size) throws IOException { - ByteBuffer byteBffer = ByteBuffer.allocate(size); - channel.read(byteBffer); - byteBffer.rewind(); - return byteBffer; - } - - - /** - * This method will be used to read the byte array from file based on length(number of bytes) - * - * @param filePath fully qualified file path - * @param length number of bytes to be read - * @return read byte array - */ - @Override public byte[] readByteArray(String filePath, int length) throws IOException { - FileChannel fileChannel = updateCache(filePath); - ByteBuffer byteBffer = read(fileChannel, length); - return byteBffer.array(); - } - - /** - * This method will be used to read long from file from postion(offset), here - * length will be always 8 bacause int byte size is 8 - * - * @param filePath fully qualified file path - * @param offset reading start position, - * @return read long - */ - @Override public long readLong(String filePath, long offset) throws IOException { - FileChannel fileChannel = updateCache(filePath); - ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.LONG_SIZE_IN_BYTE, offset); - return byteBffer.getLong(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java deleted file mode 100644 index ad1d62f..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.core.datastorage.store.impl.data.compressed; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastorage.store.NodeMeasureDataStore; -import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionHolder; -import org.apache.carbondata.core.datastorage.store.compression.WriterCompressModel; -import org.apache.carbondata.core.datastorage.store.dataholder.CarbonWriteDataHolder; -import org.apache.carbondata.core.util.ValueCompressionUtil; - -public abstract class AbstractHeavyCompressedDoubleArrayDataStore - implements NodeMeasureDataStore //NodeMeasureDataStore<double[]> -{ - - private LogService LOGGER = - LogServiceFactory.getLogService(AbstractHeavyCompressedDoubleArrayDataStore.class.getName()); - - /** - * values. - */ - protected ValueCompressionHolder[] values; - - /** - * compressionModel. - */ - protected WriterCompressModel compressionModel; - - /** - * type - */ - private char[] type; - - /** - * AbstractHeavyCompressedDoubleArrayDataStore constructor. - * - * @param compressionModel - */ - public AbstractHeavyCompressedDoubleArrayDataStore(WriterCompressModel compressionModel) { - this.compressionModel = compressionModel; - if (null != compressionModel) { - this.type = compressionModel.getType(); - values = - new ValueCompressionHolder[compressionModel.getValueCompressionHolder().length]; - } - } - - // this method first invokes encoding routine to encode the data chunk, - // followed by invoking compression routine for preparing the data chunk for writing. - @Override public byte[][] getWritableMeasureDataArray(CarbonWriteDataHolder[] dataHolder) { - byte[][] returnValue = new byte[values.length][]; - for (int i = 0; i < compressionModel.getValueCompressionHolder().length; i++) { - values[i] = compressionModel.getValueCompressionHolder()[i]; - if (type[i] != CarbonCommonConstants.BYTE_VALUE_MEASURE) { - // first perform encoding of the data chunk - values[i].setValue( - ValueCompressionUtil.getValueCompressor(compressionModel.getCompressionFinders()[i]) - .getCompressedValues(compressionModel.getCompressionFinders()[i], dataHolder[i], - compressionModel.getMaxValue()[i], - compressionModel.getMantissa()[i])); - } else { - values[i].setValue(dataHolder[i].getWritableByteArrayValues()); - } - values[i].compress(); - returnValue[i] = values[i].getCompressedData(); - } - - return returnValue; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java deleted file mode 100644 index 0c29143..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.core.datastorage.store.impl.data.compressed; - -import org.apache.carbondata.core.datastorage.store.compression.WriterCompressModel; - -public class HeavyCompressedDoubleArrayDataInMemoryStore - extends AbstractHeavyCompressedDoubleArrayDataStore { - - public HeavyCompressedDoubleArrayDataInMemoryStore(WriterCompressModel compressionModel) { - super(compressionModel); - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastorage/util/StoreFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/util/StoreFactory.java b/core/src/main/java/org/apache/carbondata/core/datastorage/util/StoreFactory.java deleted file mode 100644 index 7561ec7..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/util/StoreFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.core.datastorage.util; - -import org.apache.carbondata.core.datastorage.store.NodeMeasureDataStore; -import org.apache.carbondata.core.datastorage.store.compression.WriterCompressModel; -import org.apache.carbondata.core.datastorage.store.impl.data.compressed.HeavyCompressedDoubleArrayDataInMemoryStore; - -public final class StoreFactory { - - private StoreFactory() { - } - - public static NodeMeasureDataStore createDataStore(WriterCompressModel compressionModel) { - return new HeavyCompressedDoubleArrayDataInMemoryStore(compressionModel); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/AbstractBlockIndexStoreCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/AbstractBlockIndexStoreCache.java b/core/src/main/java/org/apache/carbondata/core/datastore/AbstractBlockIndexStoreCache.java new file mode 100644 index 0000000..cf7bf21 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/AbstractBlockIndexStoreCache.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.datastore; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CarbonLRUCache; +import org.apache.carbondata.core.datastore.block.AbstractIndex; +import org.apache.carbondata.core.datastore.block.BlockInfo; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier; +import org.apache.carbondata.core.datastore.exception.IndexBuilderException; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.util.CarbonUtil; + +/** + * This class validate and load the B-Tree in the executor lru cache + * @param <K> cache key + * @param <V> Block Meta data details + */ +public abstract class AbstractBlockIndexStoreCache<K, V> + implements Cache<TableBlockUniqueIdentifier, AbstractIndex> { + /** + * carbon store path + */ + protected String carbonStorePath; + /** + * CarbonLRU cache + */ + protected CarbonLRUCache lruCache; + + /** + * table segment id vs blockInfo list + */ + protected Map<String, List<BlockInfo>> segmentIdToBlockListMap; + + + /** + * map of block info to lock object map, while loading the btree this will be filled + * and removed after loading the tree for that particular block info, this will be useful + * while loading the tree concurrently so only block level lock will be applied another + * block can be loaded concurrently + */ + protected Map<BlockInfo, Object> blockInfoLock; + + /** + * The object will hold the segment ID lock so that at a time only 1 block that belongs to same + * segment & table can create the list for holding the block info + */ + protected Map<String, Object> segmentIDLock; + + public AbstractBlockIndexStoreCache(String carbonStorePath, CarbonLRUCache lruCache) { + this.carbonStorePath = carbonStorePath; + this.lruCache = lruCache; + blockInfoLock = new ConcurrentHashMap<BlockInfo, Object>(); + segmentIDLock= new ConcurrentHashMap<String, Object>(); + segmentIdToBlockListMap = new ConcurrentHashMap<>(); + } + + /** + * This method will get the value for the given key. If value does not exist + * for the given key, it will check and load the value. + * + * @param tableBlock + * @param tableBlockUniqueIdentifier + * @param lruCacheKey + */ + protected void checkAndLoadTableBlocks(AbstractIndex tableBlock, + TableBlockUniqueIdentifier tableBlockUniqueIdentifier, String lruCacheKey) + throws IOException { + // calculate the required size is + TableBlockInfo blockInfo = tableBlockUniqueIdentifier.getTableBlockInfo(); + long requiredMetaSize = CarbonUtil.calculateMetaSize(blockInfo); + if (requiredMetaSize > 0) { + tableBlock.setMemorySize(requiredMetaSize); + tableBlock.incrementAccessCount(); + boolean isTableBlockAddedToLruCache = lruCache.put(lruCacheKey, tableBlock, requiredMetaSize); + // if column is successfully added to lru cache then only load the + // table blocks data + if (isTableBlockAddedToLruCache) { + // load table blocks data + // getting the data file meta data of the block + DataFileFooter footer = CarbonUtil.readMetadatFile(blockInfo); + footer.setBlockInfo(new BlockInfo(blockInfo)); + // building the block + tableBlock.buildIndex(Collections.singletonList(footer)); + } else { + throw new IndexBuilderException( + "Cannot load table blocks into memory. Not enough memory available"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/BTreeBuilderInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/BTreeBuilderInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/BTreeBuilderInfo.java new file mode 100644 index 0000000..f1b0bf7 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/BTreeBuilderInfo.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.datastore; + +import java.util.List; + +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; + +/** + * below class holds the meta data requires to build the blocks + */ +public class BTreeBuilderInfo { + + /** + * holds all the information about data + * file meta data + */ + private List<DataFileFooter> footerList; + + /** + * size of the each column value size + * this will be useful for reading + */ + private int[] dimensionColumnValueSize; + + public BTreeBuilderInfo(List<DataFileFooter> footerList, + int[] dimensionColumnValueSize) { + this.dimensionColumnValueSize = dimensionColumnValueSize; + this.footerList = footerList; + } + + /** + * @return the eachDimensionBlockSize + */ + public int[] getDimensionColumnValueSize() { + return dimensionColumnValueSize; + } + + /** + * @return the footerList + */ + public List<DataFileFooter> getFooterList() { + return footerList; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java new file mode 100644 index 0000000..8e93a5b --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.datastore; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.CarbonLRUCache; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.AbstractIndex; +import org.apache.carbondata.core.datastore.block.BlockIndex; +import org.apache.carbondata.core.datastore.block.BlockInfo; +import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier; +import org.apache.carbondata.core.datastore.exception.IndexBuilderException; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.mutate.UpdateVO; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.util.CarbonProperties; + +/** + * This class is used to load the B-Tree in Executor LRU Cache + */ +public class BlockIndexStore<K, V> extends AbstractBlockIndexStoreCache<K, V> { + + /** + * LOGGER instance + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(BlockIndexStore.class.getName()); + public BlockIndexStore(String carbonStorePath, CarbonLRUCache lruCache) { + super(carbonStorePath, lruCache); + } + + /** + * The method loads the block meta in B-tree lru cache and returns the block meta. + * + * @param tableBlockUniqueIdentifier Uniquely identifies the block + * @return returns the blocks B-Tree meta + */ + @Override public AbstractIndex get(TableBlockUniqueIdentifier tableBlockUniqueIdentifier) + throws IOException { + TableBlockInfo tableBlockInfo = tableBlockUniqueIdentifier.getTableBlockInfo(); + BlockInfo blockInfo = new BlockInfo(tableBlockInfo); + String lruCacheKey = + getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo); + AbstractIndex tableBlock = (AbstractIndex) lruCache.get(lruCacheKey); + + // if block is not loaded + if (null == tableBlock) { + // check any lock object is present in + // block info lock map + Object blockInfoLockObject = blockInfoLock.get(blockInfo); + // if lock object is not present then acquire + // the lock in block info lock and add a lock object in the map for + // particular block info, added double checking mechanism to add the lock + // object so in case of concurrent query we for same block info only one lock + // object will be added + if (null == blockInfoLockObject) { + synchronized (blockInfoLock) { + // again checking the block info lock, to check whether lock object is present + // or not if now also not present then add a lock object + blockInfoLockObject = blockInfoLock.get(blockInfo); + if (null == blockInfoLockObject) { + blockInfoLockObject = new Object(); + blockInfoLock.put(blockInfo, blockInfoLockObject); + } + } + } + //acquire the lock for particular block info + synchronized (blockInfoLockObject) { + // check again whether block is present or not to avoid the + // same block is loaded + //more than once in case of concurrent query + tableBlock = (AbstractIndex) lruCache.get( + getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo)); + // if still block is not present then load the block + if (null == tableBlock) { + tableBlock = loadBlock(tableBlockUniqueIdentifier); + fillSegmentIdToBlockListMap(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), + blockInfo); + } + } + } else { + tableBlock.incrementAccessCount(); + } + return tableBlock; + } + + /** + * @param absoluteTableIdentifier + * @param blockInfo + */ + private void fillSegmentIdToBlockListMap(AbsoluteTableIdentifier absoluteTableIdentifier, + BlockInfo blockInfo) { + TableSegmentUniqueIdentifier segmentIdentifier = + new TableSegmentUniqueIdentifier(absoluteTableIdentifier, + blockInfo.getTableBlockInfo().getSegmentId()); + String uniqueTableSegmentIdentifier = segmentIdentifier.getUniqueTableSegmentIdentifier(); + List<BlockInfo> blockInfos = + segmentIdToBlockListMap.get(uniqueTableSegmentIdentifier); + if (null == blockInfos) { + Object segmentLockObject = segmentIDLock.get(uniqueTableSegmentIdentifier); + if (null == segmentLockObject) { + synchronized (segmentIDLock) { + segmentLockObject = segmentIDLock.get(uniqueTableSegmentIdentifier); + if (null == segmentLockObject) { + segmentLockObject = new Object(); + segmentIDLock.put(uniqueTableSegmentIdentifier, segmentLockObject); + } + } + } + synchronized (segmentLockObject) { + blockInfos = + segmentIdToBlockListMap.get(segmentIdentifier.getUniqueTableSegmentIdentifier()); + if (null == blockInfos) { + blockInfos = new CopyOnWriteArrayList<>(); + segmentIdToBlockListMap.put(uniqueTableSegmentIdentifier, blockInfos); + } + blockInfos.add(blockInfo); + } + } else { + blockInfos.add(blockInfo); + } + } + + /** + * The method takes list of tableblocks as input and load them in btree lru cache + * and returns the list of data blocks meta + * + * @param tableBlocksInfos List of unique table blocks + * @return List<AbstractIndex> + * @throws IndexBuilderException + */ + @Override public List<AbstractIndex> getAll(List<TableBlockUniqueIdentifier> tableBlocksInfos) + throws IndexBuilderException { + AbstractIndex[] loadedBlock = new AbstractIndex[tableBlocksInfos.size()]; + int numberOfCores = 1; + try { + numberOfCores = Integer.parseInt(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.NUM_CORES, + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)); + } catch (NumberFormatException e) { + numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); + } + ExecutorService executor = Executors.newFixedThreadPool(numberOfCores); + List<Future<AbstractIndex>> blocksList = new ArrayList<Future<AbstractIndex>>(); + for (TableBlockUniqueIdentifier tableBlockUniqueIdentifier : tableBlocksInfos) { + blocksList.add(executor.submit(new BlockLoaderThread(tableBlockUniqueIdentifier))); + } + // shutdown the executor gracefully and wait until all the task is finished + executor.shutdown(); + try { + executor.awaitTermination(1, TimeUnit.HOURS); + } catch (InterruptedException e) { + throw new IndexBuilderException(e); + } + // fill the block which were not loaded before to loaded blocks array + fillLoadedBlocks(loadedBlock, blocksList); + return Arrays.asList(loadedBlock); + } + + private String getLruCacheKey(AbsoluteTableIdentifier absoluteTableIdentifier, + BlockInfo blockInfo) { + CarbonTableIdentifier carbonTableIdentifier = + absoluteTableIdentifier.getCarbonTableIdentifier(); + return carbonTableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + + carbonTableIdentifier.getTableName() + CarbonCommonConstants.UNDERSCORE + + carbonTableIdentifier.getTableId() + CarbonCommonConstants.FILE_SEPARATOR + blockInfo + .getBlockUniqueName(); + } + + /** + * method returns the B-Tree meta + * + * @param tableBlockUniqueIdentifier Unique table block info + * @return + */ + @Override public AbstractIndex getIfPresent( + TableBlockUniqueIdentifier tableBlockUniqueIdentifier) { + BlockInfo blockInfo = new BlockInfo(tableBlockUniqueIdentifier.getTableBlockInfo()); + BlockIndex cacheable = (BlockIndex) lruCache + .get(getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo)); + if (null != cacheable) { + cacheable.incrementAccessCount(); + } + return cacheable; + } + + /** + * the method removes the entry from cache. + * + * @param tableBlockUniqueIdentifier + */ + @Override public void invalidate(TableBlockUniqueIdentifier tableBlockUniqueIdentifier) { + BlockInfo blockInfo = new BlockInfo(tableBlockUniqueIdentifier.getTableBlockInfo()); + lruCache + .remove(getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo)); + } + + @Override public void clearAccessCount(List<TableBlockUniqueIdentifier> keys) { + for (TableBlockUniqueIdentifier tableBlockUniqueIdentifier : keys) { + SegmentTaskIndexWrapper cacheable = (SegmentTaskIndexWrapper) lruCache + .get(tableBlockUniqueIdentifier.getUniqueTableBlockName()); + cacheable.clear(); + } + } + + /** + * Below method will be used to fill the loaded blocks to the array + * which will be used for query execution + * + * @param loadedBlockArray array of blocks which will be filled + * @param blocksList blocks loaded in thread + * @throws IndexBuilderException in case of any failure + */ + private void fillLoadedBlocks(AbstractIndex[] loadedBlockArray, + List<Future<AbstractIndex>> blocksList) throws IndexBuilderException { + int blockCounter = 0; + boolean exceptionOccurred = false; + Throwable exceptionRef = null; + for (int i = 0; i < loadedBlockArray.length; i++) { + try { + loadedBlockArray[i] = blocksList.get(blockCounter++).get(); + } catch (Throwable e) { + exceptionOccurred = true; + exceptionRef = e; + } + } + if (exceptionOccurred) { + LOGGER.error("Block B-tree loading failed. Clearing the access count of the loaded blocks."); + // in case of any failure clear the access count for the valid loaded blocks + clearAccessCountForLoadedBlocks(loadedBlockArray); + throw new IndexBuilderException("Block B-tree loading failed", exceptionRef); + } + } + + /** + * This method will clear the access count for the loaded blocks + * + * @param loadedBlockArray + */ + private void clearAccessCountForLoadedBlocks(AbstractIndex[] loadedBlockArray) { + for (int i = 0; i < loadedBlockArray.length; i++) { + if (null != loadedBlockArray[i]) { + loadedBlockArray[i].clear(); + } + } + } + + /** + * Thread class which will be used to load the blocks + */ + private class BlockLoaderThread implements Callable<AbstractIndex> { + // table block unique identifier + private TableBlockUniqueIdentifier tableBlockUniqueIdentifier; + + private BlockLoaderThread(TableBlockUniqueIdentifier tableBlockUniqueIdentifier) { + this.tableBlockUniqueIdentifier = tableBlockUniqueIdentifier; + } + + @Override public AbstractIndex call() throws Exception { + // load and return the loaded blocks + return get(tableBlockUniqueIdentifier); + } + } + + private AbstractIndex loadBlock(TableBlockUniqueIdentifier tableBlockUniqueIdentifier) + throws IOException { + AbstractIndex tableBlock = new BlockIndex(); + BlockInfo blockInfo = new BlockInfo(tableBlockUniqueIdentifier.getTableBlockInfo()); + String lruCacheKey = + getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo); + checkAndLoadTableBlocks(tableBlock, tableBlockUniqueIdentifier, lruCacheKey); + // finally remove the lock object from block info lock as once block is loaded + // it will not come inside this if condition + blockInfoLock.remove(blockInfo); + return tableBlock; + } + + /** + * This will be used to remove a particular blocks useful in case of + * deletion of some of the blocks in case of retention or may be some other + * scenario + * + * @param segmentIds list of table blocks to be removed + * @param absoluteTableIdentifier absolute table identifier + */ + public void removeTableBlocks(List<String> segmentIds, + AbsoluteTableIdentifier absoluteTableIdentifier) { + if (null == segmentIds) { + return; + } + for (String segmentId : segmentIds) { + TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier = + new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId); + List<BlockInfo> blockInfos = segmentIdToBlockListMap + .remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier()); + if (null != blockInfos) { + for (BlockInfo blockInfo : blockInfos) { + String lruCacheKey = getLruCacheKey(absoluteTableIdentifier, blockInfo); + lruCache.remove(lruCacheKey); + } + } + } + } + + /** + * remove TableBlocks executer level If Horizontal Compaction Done + * @param queryModel + */ + public void removeTableBlocksIfHorizontalCompactionDone(QueryModel queryModel) { + // get the invalid segments blocks details + Map<String, UpdateVO> invalidBlocksVO = queryModel.getInvalidBlockVOForSegmentId(); + if (!invalidBlocksVO.isEmpty()) { + UpdateVO updateMetadata; + Iterator<Map.Entry<String, UpdateVO>> itr = invalidBlocksVO.entrySet().iterator(); + String blockTimestamp = null; + while (itr.hasNext()) { + Map.Entry<String, UpdateVO> entry = itr.next(); + TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier = + new TableSegmentUniqueIdentifier(queryModel.getAbsoluteTableIdentifier(), + entry.getKey()); + List<BlockInfo> blockInfos = segmentIdToBlockListMap + .get(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier()); + if (null != blockInfos) { + for (BlockInfo blockInfo : blockInfos) { + // reading the updated block names from status manager instance + blockTimestamp = blockInfo.getBlockUniqueName() + .substring(blockInfo.getBlockUniqueName().lastIndexOf('-') + 1, + blockInfo.getBlockUniqueName().length()); + updateMetadata = entry.getValue(); + if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(Long.parseLong(blockTimestamp))) { + Long blockTimeStamp = Long.parseLong(blockTimestamp); + if (blockTimeStamp > updateMetadata.getFactTimestamp() && ( + updateMetadata.getUpdateDeltaStartTimestamp() != null + && blockTimeStamp < updateMetadata.getUpdateDeltaStartTimestamp())) { + String lruCacheKey = + getLruCacheKey(queryModel.getAbsoluteTableIdentifier(), blockInfo); + lruCache.remove(lruCacheKey); + } + } + } + } + } + } + } +}