http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeFixedLengthDimensionDataChunkStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeFixedLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeFixedLengthDimensionDataChunkStore.java index 8c8d08f..a689d8e 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeFixedLengthDimensionDataChunkStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeFixedLengthDimensionDataChunkStore.java @@ -124,22 +124,22 @@ public class UnsafeFixedLengthDimensionDataChunkStore /** * to compare the two byte array * - * @param index index of first byte array + * @param rowId index of first byte array * @param compareValue value of to be compared * @return compare result */ - @Override public int compareTo(int index, byte[] compareValue) { + @Override public int compareTo(int rowId, byte[] compareValue) { // based on index we need to calculate the actual position in memory block - index = index * columnValueSize; + rowId = rowId * columnValueSize; int compareResult = 0; for (int i = 0; i < compareValue.length; i++) { compareResult = (CarbonUnsafe.getUnsafe() - .getByte(dataPageMemoryBlock.getBaseObject(), dataPageMemoryBlock.getBaseOffset() + index) + .getByte(dataPageMemoryBlock.getBaseObject(), dataPageMemoryBlock.getBaseOffset() + rowId) & 0xff) - (compareValue[i] & 0xff); if (compareResult != 0) { break; } - index++; + rowId++; } return compareResult; }
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java index 36b2bd8..e1eb378 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java @@ -189,11 +189,11 @@ public class UnsafeVariableLengthDimesionDataChunkStore /** * to compare the two byte array * - * @param index index of first byte array + * @param rowId index of first byte array * @param compareValue value of to be compared * @return compare result */ - @Override public int compareTo(int index, byte[] compareValue) { + @Override public int compareTo(int rowId, byte[] compareValue) { // now to get the row from memory block we need to do following thing // 1. first get the current offset // 2. if it's not a last row- get the next row offset @@ -201,13 +201,13 @@ public class UnsafeVariableLengthDimesionDataChunkStore // else subtract the current row offset // with complete data length get the offset of set of data int currentDataOffset = CarbonUnsafe.getUnsafe().getInt(dataPageMemoryBlock.getBaseObject(), - dataPageMemoryBlock.getBaseOffset() + this.dataPointersOffsets + ((long)index + dataPageMemoryBlock.getBaseOffset() + this.dataPointersOffsets + ((long) rowId * CarbonCommonConstants.INT_SIZE_IN_BYTE * 1L)); short length = 0; // calculating the length of data - if (index < numberOfRows - 1) { + if (rowId < numberOfRows - 1) { int OffsetOfNextdata = CarbonUnsafe.getUnsafe().getInt(dataPageMemoryBlock.getBaseObject(), - dataPageMemoryBlock.getBaseOffset() + this.dataPointersOffsets + ((index + 1) + dataPageMemoryBlock.getBaseOffset() + this.dataPointersOffsets + ((rowId + 1) * CarbonCommonConstants.INT_SIZE_IN_BYTE)); length = (short) (OffsetOfNextdata - (currentDataOffset + CarbonCommonConstants.SHORT_SIZE_IN_BYTE)); http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnGroupModel.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnGroupModel.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnGroupModel.java index 74d268a..e2a4161 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnGroupModel.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnGroupModel.java @@ -29,13 +29,6 @@ public class ColumnGroupModel { private int noOfColumnsStore; /** - * whether given index is columnar or not - * true: columnar - * false: row block - */ - private boolean[] columnarStore; - - /** * column groups * e.g * {{0,1,2},3,4,{5,6}} @@ -77,15 +70,6 @@ public class ColumnGroupModel { } /** - * it's an identifier for row block or single column block - * - * @param columnarStore - */ - public void setColumnarStore(boolean[] columnarStore) { - this.columnarStore = columnarStore; - } - - /** * set column groups * * @param columnGroups @@ -95,16 +79,6 @@ public class ColumnGroupModel { } /** - * check if given column group is columnar - * - * @param colGroup - * @return true if given block is columnar - */ - public boolean isColumnar(int colGroup) { - return columnarStore[colGroup]; - } - - /** * @return columngroups */ public int[][] getColumnGroup() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileHolderImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileHolderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileHolderImpl.java deleted file mode 100644 index 182c8eb..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileHolderImpl.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.datastore.impl; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.FileHolder; - -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -public class DFSFileHolderImpl implements FileHolder { - /** - * cache to hold filename and its stream - */ - private Map<String, FSDataInputStream> fileNameAndStreamCache; - - private String queryId; - - private boolean readPageByPage; - - - public DFSFileHolderImpl() { - this.fileNameAndStreamCache = - new HashMap<String, FSDataInputStream>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - } - - @Override public byte[] readByteArray(String filePath, long offset, int length) - throws IOException { - FSDataInputStream fileChannel = updateCache(filePath); - return read(fileChannel, length, offset); - } - - /** - * This method will be used to check whether stream is already present in - * cache or not for filepath if not present then create it and then add to - * cache, other wise get from cache - * - * @param filePath fully qualified file path - * @return channel - */ - public FSDataInputStream updateCache(String filePath) throws IOException { - FSDataInputStream fileChannel = fileNameAndStreamCache.get(filePath); - if (null == fileChannel) { - Path pt = new Path(filePath); - FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration()); - fileChannel = fs.open(pt); - fileNameAndStreamCache.put(filePath, fileChannel); - } - return fileChannel; - } - - /** - * This method will be used to read from file based on number of bytes to be read and position - * - * @param channel file channel - * @param size number of bytes - * @param offset position - * @return byte buffer - */ - private byte[] read(FSDataInputStream channel, int size, long offset) throws IOException { - byte[] byteBffer = new byte[size]; - channel.seek(offset); - channel.readFully(byteBffer); - return byteBffer; - } - - /** - * This method will be used to read from file based on number of bytes to be read and position - * - * @param channel file channel - * @param size number of bytes - * @return byte buffer - */ - private byte[] read(FSDataInputStream channel, int size) throws IOException { - byte[] byteBffer = new byte[size]; - channel.readFully(byteBffer); - return byteBffer; - } - - @Override public int readInt(String filePath, long offset) throws IOException { - FSDataInputStream fileChannel = updateCache(filePath); - fileChannel.seek(offset); - return fileChannel.readInt(); - } - - @Override public long readDouble(String filePath, long offset) throws IOException { - FSDataInputStream fileChannel = updateCache(filePath); - fileChannel.seek(offset); - return fileChannel.readLong(); - } - - @Override public void finish() throws IOException { - for (Entry<String, FSDataInputStream> entry : fileNameAndStreamCache.entrySet()) { - FSDataInputStream channel = entry.getValue(); - if (null != channel) { - channel.close(); - } - } - } - - @Override public byte[] readByteArray(String filePath, int length) throws IOException { - FSDataInputStream fileChannel = updateCache(filePath); - return read(fileChannel, length); - } - - @Override public long readLong(String filePath, long offset) throws IOException { - FSDataInputStream fileChannel = updateCache(filePath); - fileChannel.seek(offset); - return fileChannel.readLong(); - } - - @Override public int readInt(String filePath) throws IOException { - FSDataInputStream fileChannel = updateCache(filePath); - return fileChannel.readInt(); - } - - @Override public ByteBuffer readByteBuffer(String filePath, long offset, int length) - throws IOException { - byte[] readByteArray = readByteArray(filePath, offset, length); - ByteBuffer byteBuffer = ByteBuffer.wrap(readByteArray); - byteBuffer.rewind(); - return byteBuffer; - } - - @Override public void setQueryId(String queryId) { - this.queryId = queryId; - } - - @Override public String getQueryId() { - return queryId; - } - - @Override public void setReadPageByPage(boolean isReadPageByPage) { - this.readPageByPage = isReadPageByPage; - } - - @Override public boolean isReadPageByPage() { - return readPageByPage; - } - - public Map<String, FSDataInputStream> getFileNameAndStreamCache() { - return fileNameAndStreamCache; - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java new file mode 100644 index 0000000..1a0cd41 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.datastore.impl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.FileReader; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class DFSFileReaderImpl implements FileReader { + /** + * cache to hold filename and its stream + */ + private Map<String, FSDataInputStream> fileNameAndStreamCache; + + private boolean readPageByPage; + + public DFSFileReaderImpl() { + this.fileNameAndStreamCache = + new HashMap<String, FSDataInputStream>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + } + + @Override public byte[] readByteArray(String filePath, long offset, int length) + throws IOException { + FSDataInputStream fileChannel = updateCache(filePath); + return read(fileChannel, length, offset); + } + + /** + * This method will be used to check whether stream is already present in + * cache or not for filepath if not present then create it and then add to + * cache, other wise get from cache + * + * @param filePath fully qualified file path + * @return channel + */ + private FSDataInputStream updateCache(String filePath) throws IOException { + FSDataInputStream fileChannel = fileNameAndStreamCache.get(filePath); + if (null == fileChannel) { + Path pt = new Path(filePath); + FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration()); + fileChannel = fs.open(pt); + fileNameAndStreamCache.put(filePath, fileChannel); + } + return fileChannel; + } + + /** + * This method will be used to read from file based on number of bytes to be read and position + * + * @param channel file channel + * @param size number of bytes + * @param offset position + * @return byte buffer + */ + private byte[] read(FSDataInputStream channel, int size, long offset) throws IOException { + byte[] byteBffer = new byte[size]; + channel.seek(offset); + channel.readFully(byteBffer); + return byteBffer; + } + + /** + * This method will be used to read from file based on number of bytes to be read and position + * + * @param channel file channel + * @param size number of bytes + * @return byte buffer + */ + private byte[] read(FSDataInputStream channel, int size) throws IOException { + byte[] byteBffer = new byte[size]; + channel.readFully(byteBffer); + return byteBffer; + } + + @Override public int readInt(String filePath, long offset) throws IOException { + FSDataInputStream fileChannel = updateCache(filePath); + fileChannel.seek(offset); + return fileChannel.readInt(); + } + + @Override public long readDouble(String filePath, long offset) throws IOException { + FSDataInputStream fileChannel = updateCache(filePath); + fileChannel.seek(offset); + return fileChannel.readLong(); + } + + @Override public void finish() throws IOException { + for (Entry<String, FSDataInputStream> entry : fileNameAndStreamCache.entrySet()) { + FSDataInputStream channel = entry.getValue(); + if (null != channel) { + channel.close(); + } + } + } + + @Override public byte[] readByteArray(String filePath, int length) throws IOException { + FSDataInputStream fileChannel = updateCache(filePath); + return read(fileChannel, length); + } + + @Override public long readLong(String filePath, long offset) throws IOException { + FSDataInputStream fileChannel = updateCache(filePath); + fileChannel.seek(offset); + return fileChannel.readLong(); + } + + @Override public int readInt(String filePath) throws IOException { + FSDataInputStream fileChannel = updateCache(filePath); + return fileChannel.readInt(); + } + + @Override public ByteBuffer readByteBuffer(String filePath, long offset, int length) + throws IOException { + byte[] readByteArray = readByteArray(filePath, offset, length); + ByteBuffer byteBuffer = ByteBuffer.wrap(readByteArray); + byteBuffer.rewind(); + return byteBuffer; + } + + @Override public void setReadPageByPage(boolean isReadPageByPage) { + this.readPageByPage = isReadPageByPage; + } + + @Override public boolean isReadPageByPage() { + return readPageByPage; + } + + public Map<String, FSDataInputStream> getFileNameAndStreamCache() { + return fileNameAndStreamCache; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java index 67648fe..b58a473 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java @@ -17,24 +17,28 @@ package org.apache.carbondata.core.datastore.impl; -import org.apache.carbondata.core.datastore.FileHolder; -import org.apache.carbondata.core.datastore.filesystem.*; +import org.apache.carbondata.core.datastore.FileReader; +import org.apache.carbondata.core.datastore.filesystem.AlluxioCarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.HDFSCarbonFile; +import org.apache.carbondata.core.datastore.filesystem.LocalCarbonFile; +import org.apache.carbondata.core.datastore.filesystem.ViewFSCarbonFile; import org.apache.hadoop.conf.Configuration; public class DefaultFileTypeProvider implements FileTypeInerface { - public FileHolder getFileHolder(FileFactory.FileType fileType) { + public FileReader getFileHolder(FileFactory.FileType fileType) { switch (fileType) { case LOCAL: - return new FileHolderImpl(); + return new FileReaderImpl(); case HDFS: case ALLUXIO: case VIEWFS: case S3: - return new DFSFileHolderImpl(); + return new DFSFileReaderImpl(); default: - return new FileHolderImpl(); + return new FileReaderImpl(); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java index f141991..9bcdfae 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java @@ -28,7 +28,7 @@ import java.nio.channels.FileChannel; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.FileHolder; +import org.apache.carbondata.core.datastore.FileReader; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.commons.io.FileUtils; @@ -62,7 +62,7 @@ public final class FileFactory { return configuration; } - public static FileHolder getFileHolder(FileType fileType) { + public static FileReader getFileHolder(FileType fileType) { return fileFileTypeInerface.getFileHolder(fileType); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileHolderImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileHolderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileHolderImpl.java deleted file mode 100644 index cc589b7..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileHolderImpl.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.core.datastore.impl; - -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.FileHolder; - -public class FileHolderImpl implements FileHolder { - /** - * cache to hold filename and its stream - */ - private Map<String, FileChannel> fileNameAndStreamCache; - private String queryId; - - private boolean readPageByPage; - - /** - * FileHolderImpl Constructor - * It will create the cache - */ - public FileHolderImpl() { - this.fileNameAndStreamCache = - new HashMap<String, FileChannel>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - } - - public FileHolderImpl(int capacity) { - this.fileNameAndStreamCache = new HashMap<String, FileChannel>(capacity); - } - - /** - * This method will be used to read the byte array from file based on offset - * and length(number of bytes) need to read - * - * @param filePath fully qualified file path - * @param offset reading start position, - * @param length number of bytes to be read - * @return read byte array - */ - @Override public byte[] readByteArray(String filePath, long offset, int length) - throws IOException { - FileChannel fileChannel = updateCache(filePath); - ByteBuffer byteBffer = read(fileChannel, length, offset); - return byteBffer.array(); - } - - /** - * This method will be used to close all the streams currently present in the cache - */ - @Override public void finish() throws IOException { - for (Entry<String, FileChannel> entry : fileNameAndStreamCache.entrySet()) { - FileChannel channel = entry.getValue(); - if (null != channel) { - channel.close(); - } - } - } - - /** - * This method will be used to read int from file from postion(offset), here - * length will be always 4 bacause int byte size if 4 - * - * @param filePath fully qualified file path - * @param offset reading start position, - * @return read int - */ - @Override public int readInt(String filePath, long offset) throws IOException { - FileChannel fileChannel = updateCache(filePath); - ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.INT_SIZE_IN_BYTE, offset); - return byteBffer.getInt(); - } - - /** - * This method will be used to read int from file from postion(offset), here - * length will be always 4 bacause int byte size if 4 - * - * @param filePath fully qualified file path - * @return read int - */ - @Override public int readInt(String filePath) throws IOException { - FileChannel fileChannel = updateCache(filePath); - ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.INT_SIZE_IN_BYTE); - return byteBffer.getInt(); - } - - /** - * This method will be used to read int from file from postion(offset), here - * length will be always 4 bacause int byte size if 4 - * - * @param filePath fully qualified file path - * @param offset reading start position, - * @return read int - */ - @Override public long readDouble(String filePath, long offset) throws IOException { - FileChannel fileChannel = updateCache(filePath); - ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.LONG_SIZE_IN_BYTE, offset); - return byteBffer.getLong(); - } - - /** - * This method will be used to check whether stream is already present in - * cache or not for filepath if not present then create it and then add to - * cache, other wise get from cache - * - * @param filePath fully qualified file path - * @return channel - */ - private FileChannel updateCache(String filePath) throws FileNotFoundException { - FileChannel fileChannel = fileNameAndStreamCache.get(filePath); - if (null == fileChannel) { - FileInputStream stream = new FileInputStream(filePath); - fileChannel = stream.getChannel(); - fileNameAndStreamCache.put(filePath, fileChannel); - } - return fileChannel; - } - - /** - * This method will be used to read from file based on number of bytes to be read and position - * - * @param channel file channel - * @param size number of bytes - * @param offset position - * @return byte buffer - */ - private ByteBuffer read(FileChannel channel, int size, long offset) throws IOException { - ByteBuffer byteBffer = ByteBuffer.allocate(size); - channel.position(offset); - channel.read(byteBffer); - byteBffer.rewind(); - return byteBffer; - } - - /** - * This method will be used to read from file based on number of bytes to be read and position - * - * @param channel file channel - * @param size number of bytes - * @return byte buffer - */ - private ByteBuffer read(FileChannel channel, int size) throws IOException { - ByteBuffer byteBffer = ByteBuffer.allocate(size); - channel.read(byteBffer); - byteBffer.rewind(); - return byteBffer; - } - - - /** - * This method will be used to read the byte array from file based on length(number of bytes) - * - * @param filePath fully qualified file path - * @param length number of bytes to be read - * @return read byte array - */ - @Override public byte[] readByteArray(String filePath, int length) throws IOException { - FileChannel fileChannel = updateCache(filePath); - ByteBuffer byteBffer = read(fileChannel, length); - return byteBffer.array(); - } - - /** - * This method will be used to read long from file from postion(offset), here - * length will be always 8 bacause int byte size is 8 - * - * @param filePath fully qualified file path - * @param offset reading start position, - * @return read long - */ - @Override public long readLong(String filePath, long offset) throws IOException { - FileChannel fileChannel = updateCache(filePath); - ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.LONG_SIZE_IN_BYTE, offset); - return byteBffer.getLong(); - } - - @Override public ByteBuffer readByteBuffer(String filePath, long offset, int length) - throws IOException { - ByteBuffer byteBuffer = ByteBuffer.allocate(length); - FileChannel fileChannel = updateCache(filePath); - fileChannel.position(offset); - fileChannel.read(byteBuffer); - byteBuffer.rewind(); - return byteBuffer; - } - - @Override public void setQueryId(String queryId) { - this.queryId = queryId; - } - - @Override public String getQueryId() { - return queryId; - } - - @Override public void setReadPageByPage(boolean isReadPageByPage) { - this.readPageByPage = isReadPageByPage; - } - - @Override public boolean isReadPageByPage() { - return readPageByPage; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileReaderImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileReaderImpl.java new file mode 100644 index 0000000..6fef278 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileReaderImpl.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.impl; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.FileReader; + +public class FileReaderImpl implements FileReader { + /** + * cache to hold filename and its stream + */ + private Map<String, FileChannel> fileNameAndStreamCache; + + private boolean readPageByPage; + + /** + * FileReaderImpl Constructor + * It will create the cache + */ + public FileReaderImpl() { + this.fileNameAndStreamCache = + new HashMap<String, FileChannel>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + } + + public FileReaderImpl(int capacity) { + this.fileNameAndStreamCache = new HashMap<String, FileChannel>(capacity); + } + + /** + * This method will be used to read the byte array from file based on offset + * and length(number of bytes) need to read + * + * @param filePath fully qualified file path + * @param offset reading start position, + * @param length number of bytes to be read + * @return read byte array + */ + @Override public byte[] readByteArray(String filePath, long offset, int length) + throws IOException { + FileChannel fileChannel = updateCache(filePath); + ByteBuffer byteBffer = read(fileChannel, length, offset); + return byteBffer.array(); + } + + /** + * This method will be used to close all the streams currently present in the cache + */ + @Override public void finish() throws IOException { + for (Entry<String, FileChannel> entry : fileNameAndStreamCache.entrySet()) { + FileChannel channel = entry.getValue(); + if (null != channel) { + channel.close(); + } + } + } + + /** + * This method will be used to read int from file from postion(offset), here + * length will be always 4 bacause int byte size if 4 + * + * @param filePath fully qualified file path + * @param offset reading start position, + * @return read int + */ + @Override public int readInt(String filePath, long offset) throws IOException { + FileChannel fileChannel = updateCache(filePath); + ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.INT_SIZE_IN_BYTE, offset); + return byteBffer.getInt(); + } + + /** + * This method will be used to read int from file from postion(offset), here + * length will be always 4 bacause int byte size if 4 + * + * @param filePath fully qualified file path + * @return read int + */ + @Override public int readInt(String filePath) throws IOException { + FileChannel fileChannel = updateCache(filePath); + ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.INT_SIZE_IN_BYTE); + return byteBffer.getInt(); + } + + /** + * This method will be used to read int from file from postion(offset), here + * length will be always 4 bacause int byte size if 4 + * + * @param filePath fully qualified file path + * @param offset reading start position, + * @return read int + */ + @Override public long readDouble(String filePath, long offset) throws IOException { + FileChannel fileChannel = updateCache(filePath); + ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.LONG_SIZE_IN_BYTE, offset); + return byteBffer.getLong(); + } + + /** + * This method will be used to check whether stream is already present in + * cache or not for filepath if not present then create it and then add to + * cache, other wise get from cache + * + * @param filePath fully qualified file path + * @return channel + */ + private FileChannel updateCache(String filePath) throws FileNotFoundException { + FileChannel fileChannel = fileNameAndStreamCache.get(filePath); + if (null == fileChannel) { + FileInputStream stream = new FileInputStream(filePath); + fileChannel = stream.getChannel(); + fileNameAndStreamCache.put(filePath, fileChannel); + } + return fileChannel; + } + + /** + * This method will be used to read from file based on number of bytes to be read and position + * + * @param channel file channel + * @param size number of bytes + * @param offset position + * @return byte buffer + */ + private ByteBuffer read(FileChannel channel, int size, long offset) throws IOException { + ByteBuffer byteBffer = ByteBuffer.allocate(size); + channel.position(offset); + channel.read(byteBffer); + byteBffer.rewind(); + return byteBffer; + } + + /** + * This method will be used to read from file based on number of bytes to be read and position + * + * @param channel file channel + * @param size number of bytes + * @return byte buffer + */ + private ByteBuffer read(FileChannel channel, int size) throws IOException { + ByteBuffer byteBffer = ByteBuffer.allocate(size); + channel.read(byteBffer); + byteBffer.rewind(); + return byteBffer; + } + + + /** + * This method will be used to read the byte array from file based on length(number of bytes) + * + * @param filePath fully qualified file path + * @param length number of bytes to be read + * @return read byte array + */ + @Override public byte[] readByteArray(String filePath, int length) throws IOException { + FileChannel fileChannel = updateCache(filePath); + ByteBuffer byteBffer = read(fileChannel, length); + return byteBffer.array(); + } + + /** + * This method will be used to read long from file from postion(offset), here + * length will be always 8 bacause int byte size is 8 + * + * @param filePath fully qualified file path + * @param offset reading start position, + * @return read long + */ + @Override public long readLong(String filePath, long offset) throws IOException { + FileChannel fileChannel = updateCache(filePath); + ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.LONG_SIZE_IN_BYTE, offset); + return byteBffer.getLong(); + } + + @Override public ByteBuffer readByteBuffer(String filePath, long offset, int length) + throws IOException { + ByteBuffer byteBuffer = ByteBuffer.allocate(length); + FileChannel fileChannel = updateCache(filePath); + fileChannel.position(offset); + fileChannel.read(byteBuffer); + byteBuffer.rewind(); + return byteBuffer; + } + + @Override public void setReadPageByPage(boolean isReadPageByPage) { + this.readPageByPage = isReadPageByPage; + } + + @Override public boolean isReadPageByPage() { + return readPageByPage; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInerface.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInerface.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInerface.java index 4676278..413261c 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInerface.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInerface.java @@ -17,14 +17,14 @@ package org.apache.carbondata.core.datastore.impl; -import org.apache.carbondata.core.datastore.FileHolder; +import org.apache.carbondata.core.datastore.FileReader; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.hadoop.conf.Configuration; public interface FileTypeInerface { - FileHolder getFileHolder(FileFactory.FileType fileType); + FileReader getFileHolder(FileFactory.FileType fileType); CarbonFile getCarbonFile(String path, FileFactory.FileType fileType); CarbonFile getCarbonFile(String path, FileFactory.FileType fileType, Configuration configuration); http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java index 19b1f1c..fe4cf83 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/AbstractBTreeLeafNode.java @@ -18,9 +18,8 @@ package org.apache.carbondata.core.datastore.impl.btree; import java.io.IOException; -import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache; import org.apache.carbondata.core.datastore.DataRefNode; -import org.apache.carbondata.core.datastore.FileHolder; +import org.apache.carbondata.core.datastore.FileReader; import org.apache.carbondata.core.datastore.IndexKey; import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; @@ -31,38 +30,31 @@ import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; public abstract class AbstractBTreeLeafNode implements BTreeNode { /** - * Below method will be used to load the data block - * - * @param blockInfo block detail - */ - protected BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache; - - /** * number of keys in a btree */ - protected int numberOfKeys; + int numberOfKeys; /** * node number */ - protected long nodeNumber; + long nodeNumber; /** * Next node of the leaf */ - protected BTreeNode nextNode; + private BTreeNode nextNode; /** * max key of the column this will be used to check whether this leaf will * be used for scanning or not */ - protected byte[][] maxKeyOfColumns; + byte[][] maxKeyOfColumns; /** * min key of the column this will be used to check whether this leaf will * be used for scanning or not */ - protected byte[][] minKeyOfColumns; + byte[][] minKeyOfColumns; /** * Method to get the next block this can be used while scanning when @@ -70,7 +62,7 @@ public abstract class AbstractBTreeLeafNode implements BTreeNode { * * @return next block */ - @Override public int nodeSize() { + @Override public int numRows() { return this.numberOfKeys; } @@ -109,7 +101,7 @@ public abstract class AbstractBTreeLeafNode implements BTreeNode { * * @return block number */ - @Override public long nodeNumber() { + @Override public long nodeIndex() { return nodeNumber; } @@ -174,11 +166,11 @@ public abstract class AbstractBTreeLeafNode implements BTreeNode { * Below method will be used to get the dimension chunks * * @param fileReader file reader to read the chunks from file - * @param blockIndexes indexes of the blocks need to be read + * @param columnIndexRange indexes of the blocks need to be read * @return dimension data chunks */ - @Override public DimensionRawColumnChunk[] getDimensionChunks(FileHolder fileReader, - int[][] blockIndexes) throws IOException { + @Override public DimensionRawColumnChunk[] readDimensionChunks(FileReader fileReader, + int[][] columnIndexRange) throws IOException { // No required here as leaf which will will be use this class will implement its own get // dimension chunks return null; @@ -188,11 +180,11 @@ public abstract class AbstractBTreeLeafNode implements BTreeNode { * Below method will be used to get the dimension chunk * * @param fileReader file reader to read the chunk from file - * @param blockIndex block index to be read + * @param columnIndex block index to be read * @return dimension data chunk */ - @Override public DimensionRawColumnChunk getDimensionChunk(FileHolder fileReader, - int blockIndex) throws IOException { + @Override public DimensionRawColumnChunk readDimensionChunk(FileReader fileReader, + int columnIndex) throws IOException { // No required here as leaf which will will be use this class will implement // its own get dimension chunks return null; @@ -202,11 +194,11 @@ public abstract class AbstractBTreeLeafNode implements BTreeNode { * Below method will be used to get the measure chunk * * @param fileReader file reader to read the chunk from file - * @param blockIndexes block indexes to be read from file + * @param columnIndexRange block indexes to be read from file * @return measure column data chunk */ - @Override public MeasureRawColumnChunk[] getMeasureChunks(FileHolder fileReader, - int[][] blockIndexes) throws IOException { + @Override public MeasureRawColumnChunk[] readMeasureChunks(FileReader fileReader, + int[][] columnIndexRange) throws IOException { // No required here as leaf which will will be use this class will implement its own get // measure chunks return null; @@ -216,30 +208,16 @@ public abstract class AbstractBTreeLeafNode implements BTreeNode { * Below method will be used to read the measure chunk * * @param fileReader file read to read the file chunk - * @param blockIndex block index to be read from file + * @param columnIndex block index to be read from file * @return measure data chunk */ - @Override public MeasureRawColumnChunk getMeasureChunk(FileHolder fileReader, int blockIndex) + @Override public MeasureRawColumnChunk readMeasureChunk(FileReader fileReader, int columnIndex) throws IOException { // No required here as leaf which will will be use this class will implement its own get // measure chunks return null; } - /** - * @param deleteDeltaDataCache - */ - public void setDeleteDeltaDataCache(BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache) { - - this.deleteDeltaDataCache = deleteDeltaDataCache; - } - /** - * @return the segmentProperties - */ - public BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache() { - return deleteDeltaDataCache; - } - @Override public int getPageRowCount(int pageNumber) { throw new UnsupportedOperationException("Unsupported operation"); http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeDataRefNodeFinder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeDataRefNodeFinder.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeDataRefNodeFinder.java index 2f8aadf..688d56a 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeDataRefNodeFinder.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeDataRefNodeFinder.java @@ -106,7 +106,7 @@ public class BTreeDataRefNodeFinder implements DataRefNodeFinder { private BTreeNode findFirstLeafNode(IndexKey key, BTreeNode node) { int childNodeIndex; int low = 0; - int high = node.nodeSize() - 1; + int high = node.numRows() - 1; int mid = 0; int compareRes = -1; IndexKey[] nodeKeys = node.getNodeKeys(); @@ -156,7 +156,7 @@ public class BTreeDataRefNodeFinder implements DataRefNodeFinder { private BTreeNode findLastLeafNode(IndexKey key, BTreeNode node) { int childNodeIndex; int low = 0; - int high = node.nodeSize() - 1; + int high = node.numRows() - 1; int mid = 0; int compareRes = -1; IndexKey[] nodeKeys = node.getNodeKeys(); @@ -172,7 +172,7 @@ public class BTreeDataRefNodeFinder implements DataRefNodeFinder { } else { int currentPos = mid; // if key is matched then get the first entry - while (currentPos + 1 < node.nodeSize() + while (currentPos + 1 < node.numRows() && compareIndexes(key, nodeKeys[currentPos + 1]) == 0) { currentPos++; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java index ccc5e12..c200f8d 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BTreeNonLeafNode.java @@ -19,10 +19,9 @@ package org.apache.carbondata.core.datastore.impl.btree; import java.util.ArrayList; import java.util.List; -import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.DataRefNode; -import org.apache.carbondata.core.datastore.FileHolder; +import org.apache.carbondata.core.datastore.FileReader; import org.apache.carbondata.core.datastore.IndexKey; import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; @@ -34,13 +33,6 @@ import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; public class BTreeNonLeafNode implements BTreeNode { /** - * Below method will be used to load the data block - * - * @param blockInfo block detail - */ - protected BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache; - - /** * Child nodes */ private BTreeNode[] children; @@ -50,7 +42,7 @@ public class BTreeNonLeafNode implements BTreeNode { */ private List<IndexKey> listOfKeys; - public BTreeNonLeafNode() { + BTreeNonLeafNode() { // creating a list which will store all the indexes listOfKeys = new ArrayList<IndexKey>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); } @@ -120,7 +112,7 @@ public class BTreeNonLeafNode implements BTreeNode { * * @return number of keys in the block */ - @Override public int nodeSize() { + @Override public int numRows() { return listOfKeys.size(); } @@ -131,11 +123,11 @@ public class BTreeNonLeafNode implements BTreeNode { * * @return block number */ - @Override public long nodeNumber() { + @Override public long nodeIndex() { throw new UnsupportedOperationException("Unsupported operation"); } - @Override public String blockletId() { + @Override public short blockletIndex() { throw new UnsupportedOperationException("Unsupported operation"); } @@ -171,11 +163,11 @@ public class BTreeNonLeafNode implements BTreeNode { * Below method will be used to get the dimension chunks * * @param fileReader file reader to read the chunks from file - * @param blockIndexes indexes of the blocks need to be read + * @param columnIndexRange indexes of the blocks need to be read * @return dimension data chunks */ - @Override public DimensionRawColumnChunk[] getDimensionChunks(FileHolder fileReader, - int[][] blockIndexes) { + @Override public DimensionRawColumnChunk[] readDimensionChunks(FileReader fileReader, + int[][] columnIndexRange) { // operation of getting the dimension chunks is not supported as its a // non leaf node @@ -191,8 +183,8 @@ public class BTreeNonLeafNode implements BTreeNode { * @param fileReader file reader to read the chunk from file * @return dimension data chunk */ - @Override public DimensionRawColumnChunk getDimensionChunk(FileHolder fileReader, - int blockIndexes) { + @Override public DimensionRawColumnChunk readDimensionChunk(FileReader fileReader, + int columnIndex) { // operation of getting the dimension chunk is not supported as its a // non leaf node // and in case of B+Tree data will be stored only in leaf node and @@ -205,11 +197,11 @@ public class BTreeNonLeafNode implements BTreeNode { * Below method will be used to get the measure chunk * * @param fileReader file reader to read the chunk from file - * @param blockIndexes block indexes to be read from file + * @param columnIndexRange block indexes to be read from file * @return measure column data chunk */ - @Override public MeasureRawColumnChunk[] getMeasureChunks(FileHolder fileReader, - int[][] blockIndexes) { + @Override public MeasureRawColumnChunk[] readMeasureChunks(FileReader fileReader, + int[][] columnIndexRange) { // operation of getting the measure chunk is not supported as its a non // leaf node // and in case of B+Tree data will be stored only in leaf node and @@ -222,11 +214,11 @@ public class BTreeNonLeafNode implements BTreeNode { * Below method will be used to read the measure chunk * * @param fileReader file read to read the file chunk - * @param blockIndex block index to be read from file + * @param columnIndex block index to be read from file * @return measure data chunk */ - @Override public MeasureRawColumnChunk getMeasureChunk(FileHolder fileReader, int blockIndex) { + @Override public MeasureRawColumnChunk readMeasureChunk(FileReader fileReader, int columnIndex) { // operation of getting the measure chunk is not supported as its a non // leaf node // and in case of B+Tree data will be stored only in leaf node and @@ -236,20 +228,6 @@ public class BTreeNonLeafNode implements BTreeNode { } /** - * @return the segmentProperties - */ - public void setDeleteDeltaDataCache(BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache) { - - this.deleteDeltaDataCache = deleteDeltaDataCache; - } - /** - * @return the segmentProperties - */ - public BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache() { - return deleteDeltaDataCache; - } - - /** * number of pages in blocklet * @return */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockBTreeLeafNode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockBTreeLeafNode.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockBTreeLeafNode.java index 25817f5..8af7eae 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockBTreeLeafNode.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockBTreeLeafNode.java @@ -38,7 +38,7 @@ public class BlockBTreeLeafNode extends AbstractBTreeLeafNode { * node * @param metadataIndex metadata index */ - public BlockBTreeLeafNode(BTreeBuilderInfo builderInfos, int metadataIndex, long nodeNumber) { + BlockBTreeLeafNode(BTreeBuilderInfo builderInfos, int metadataIndex, long nodeNumber) { DataFileFooter footer = builderInfos.getFooterList().get(metadataIndex); BlockletMinMaxIndex minMaxIndex = footer.getBlockletIndex().getMinMaxIndex(); maxKeyOfColumns = minMaxIndex.getMaxValues(); @@ -63,8 +63,8 @@ public class BlockBTreeLeafNode extends AbstractBTreeLeafNode { * Below method is suppose to return the Blocklet ID. * @return */ - @Override public String blockletId() { - return blockInfo.getTableBlockInfo().getDetailInfo().getBlockletId().toString(); + @Override public short blockletIndex() { + return blockInfo.getTableBlockInfo().getDetailInfo().getBlockletId(); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockletBTreeLeafNode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockletBTreeLeafNode.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockletBTreeLeafNode.java index 94221ba..ddd7fcf 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockletBTreeLeafNode.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/btree/BlockletBTreeLeafNode.java @@ -20,7 +20,7 @@ import java.io.IOException; import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; import org.apache.carbondata.core.datastore.BTreeBuilderInfo; -import org.apache.carbondata.core.datastore.FileHolder; +import org.apache.carbondata.core.datastore.FileReader; import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; import org.apache.carbondata.core.datastore.chunk.reader.CarbonDataReaderFactory; @@ -69,7 +69,7 @@ public class BlockletBTreeLeafNode extends AbstractBTreeLeafNode { * this will be used during query execution when we can * give some leaf node of a btree to one executor some to other */ - public BlockletBTreeLeafNode(BTreeBuilderInfo builderInfos, int leafIndex, long nodeNumber) { + BlockletBTreeLeafNode(BTreeBuilderInfo builderInfos, int leafIndex, long nodeNumber) { // get a lead node min max BlockletMinMaxIndex minMaxIndex = builderInfos.getFooterList().get(0).getBlockletList().get(leafIndex).getBlockletIndex() @@ -124,23 +124,23 @@ public class BlockletBTreeLeafNode extends AbstractBTreeLeafNode { } } - @Override public String blockletId() { - return "0"; + @Override public short blockletIndex() { + return 0; } /** * Below method will be used to get the dimension chunks * * @param fileReader file reader to read the chunks from file - * @param blockIndexes indexes of the blocks need to be read + * @param columnIndexRange indexes of the blocks need to be read * @return dimension data chunks */ - @Override public DimensionRawColumnChunk[] getDimensionChunks(FileHolder fileReader, - int[][] blockIndexes) throws IOException { + @Override public DimensionRawColumnChunk[] readDimensionChunks(FileReader fileReader, + int[][] columnIndexRange) throws IOException { if (fileReader.isReadPageByPage()) { - return dimensionChunksPageLevelReader.readRawDimensionChunks(fileReader, blockIndexes); + return dimensionChunksPageLevelReader.readRawDimensionChunks(fileReader, columnIndexRange); } else { - return dimensionChunksReader.readRawDimensionChunks(fileReader, blockIndexes); + return dimensionChunksReader.readRawDimensionChunks(fileReader, columnIndexRange); } } @@ -148,15 +148,15 @@ public class BlockletBTreeLeafNode extends AbstractBTreeLeafNode { * Below method will be used to get the dimension chunk * * @param fileReader file reader to read the chunk from file - * @param blockIndex block index to be read + * @param columnIndex block index to be read * @return dimension data chunk */ - @Override public DimensionRawColumnChunk getDimensionChunk(FileHolder fileReader, int blockIndex) - throws IOException { + @Override public DimensionRawColumnChunk readDimensionChunk( + FileReader fileReader, int columnIndex) throws IOException { if (fileReader.isReadPageByPage()) { - return dimensionChunksPageLevelReader.readRawDimensionChunk(fileReader, blockIndex); + return dimensionChunksPageLevelReader.readRawDimensionChunk(fileReader, columnIndex); } else { - return dimensionChunksReader.readRawDimensionChunk(fileReader, blockIndex); + return dimensionChunksReader.readRawDimensionChunk(fileReader, columnIndex); } } @@ -164,15 +164,15 @@ public class BlockletBTreeLeafNode extends AbstractBTreeLeafNode { * Below method will be used to get the measure chunk * * @param fileReader file reader to read the chunk from file - * @param blockIndexes block indexes to be read from file + * @param columnIndexRange block indexes to be read from file * @return measure column data chunk */ - @Override public MeasureRawColumnChunk[] getMeasureChunks(FileHolder fileReader, - int[][] blockIndexes) throws IOException { + @Override public MeasureRawColumnChunk[] readMeasureChunks(FileReader fileReader, + int[][] columnIndexRange) throws IOException { if (fileReader.isReadPageByPage()) { - return measureColumnChunkPageLevelReader.readRawMeasureChunks(fileReader, blockIndexes); + return measureColumnChunkPageLevelReader.readRawMeasureChunks(fileReader, columnIndexRange); } else { - return measureColumnChunkReader.readRawMeasureChunks(fileReader, blockIndexes); + return measureColumnChunkReader.readRawMeasureChunks(fileReader, columnIndexRange); } } @@ -180,15 +180,15 @@ public class BlockletBTreeLeafNode extends AbstractBTreeLeafNode { * Below method will be used to read the measure chunk * * @param fileReader file read to read the file chunk - * @param blockIndex block index to be read from file + * @param columnIndex block index to be read from file * @return measure data chunk */ - @Override public MeasureRawColumnChunk getMeasureChunk(FileHolder fileReader, int blockIndex) + @Override public MeasureRawColumnChunk readMeasureChunk(FileReader fileReader, int columnIndex) throws IOException { if (fileReader.isReadPageByPage()) { - return measureColumnChunkPageLevelReader.readRawMeasureChunk(fileReader, blockIndex); + return measureColumnChunkPageLevelReader.readRawMeasureChunk(fileReader, columnIndex); } else { - return measureColumnChunkReader.readRawMeasureChunk(fileReader, blockIndex); + return measureColumnChunkReader.readRawMeasureChunk(fileReader, columnIndex); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java index 6d96b3b..597def0 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java @@ -41,7 +41,13 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.format.Encoding; -import static org.apache.carbondata.format.Encoding.*; +import static org.apache.carbondata.format.Encoding.ADAPTIVE_DELTA_FLOATING; +import static org.apache.carbondata.format.Encoding.ADAPTIVE_DELTA_INTEGRAL; +import static org.apache.carbondata.format.Encoding.ADAPTIVE_FLOATING; +import static org.apache.carbondata.format.Encoding.ADAPTIVE_INTEGRAL; +import static org.apache.carbondata.format.Encoding.BOOL_BYTE; +import static org.apache.carbondata.format.Encoding.DIRECT_COMPRESS; +import static org.apache.carbondata.format.Encoding.RLE_INTEGRAL; /** * Base class for encoding factory implementation. http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/dictionary/server/NonSecureDictionaryServer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/server/NonSecureDictionaryServer.java b/core/src/main/java/org/apache/carbondata/core/dictionary/server/NonSecureDictionaryServer.java index c7411d6..daba470 100644 --- a/core/src/main/java/org/apache/carbondata/core/dictionary/server/NonSecureDictionaryServer.java +++ b/core/src/main/java/org/apache/carbondata/core/dictionary/server/NonSecureDictionaryServer.java @@ -27,7 +27,6 @@ import org.apache.carbondata.core.dictionary.service.AbstractDictionaryServer; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.util.CarbonProperties; - import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java index ce05fe2..2865d4b 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java @@ -213,10 +213,6 @@ public class BlockletDetailInfo implements Serializable, Writable { return columnSchemas; } - public void setColumnSchemas(List<ColumnSchema> columnSchemas) { - this.columnSchemas = columnSchemas; - } - public void setColumnSchemaBinary(byte[] columnSchemaBinary) { this.columnSchemaBinary = columnSchemaBinary; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java new file mode 100644 index 0000000..5cd59cb --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore.blockletindex; + +import java.io.IOException; +import java.util.List; + +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; +import org.apache.carbondata.core.constants.CarbonVersionConstants; +import org.apache.carbondata.core.datastore.DataRefNode; +import org.apache.carbondata.core.datastore.FileReader; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; +import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; +import org.apache.carbondata.core.datastore.chunk.reader.CarbonDataReaderFactory; +import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader; +import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader; +import org.apache.carbondata.core.indexstore.BlockletDetailInfo; +import org.apache.carbondata.core.metadata.ColumnarFormatVersion; +import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex; + +/** + * wrapper for blocklet data map data + */ +public class BlockletDataRefNode implements DataRefNode { + + private List<TableBlockInfo> blockInfos; + + private int index; + + private int[] dimensionLens; + + BlockletDataRefNode(List<TableBlockInfo> blockInfos, int index, int[] dimensionLens) { + this.blockInfos = blockInfos; + // Update row count and page count to blocklet info + for (TableBlockInfo blockInfo : blockInfos) { + BlockletDetailInfo detailInfo = blockInfo.getDetailInfo(); + detailInfo.getBlockletInfo().setNumberOfRows(detailInfo.getRowCount()); + detailInfo.getBlockletInfo().setNumberOfPages(detailInfo.getPagesCount()); + detailInfo.setBlockletId(blockInfo.getDetailInfo().getBlockletId()); + int[] pageRowCount = new int[detailInfo.getPagesCount()]; + int numberOfPagesCompletelyFilled = detailInfo.getRowCount(); + // no. of rows to a page is 120000 in V2 and 32000 in V3, same is handled to get the number + // of pages filled + if (blockInfo.getVersion() == ColumnarFormatVersion.V2) { + numberOfPagesCompletelyFilled /= + CarbonVersionConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT_V2; + } else { + numberOfPagesCompletelyFilled /= + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT; + } + int lastPageRowCount = detailInfo.getRowCount() + % CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT; + for (int i = 0; i < numberOfPagesCompletelyFilled; i++) { + pageRowCount[i] = + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT; + } + if (lastPageRowCount > 0) { + pageRowCount[pageRowCount.length - 1] = lastPageRowCount; + } + detailInfo.getBlockletInfo().setNumberOfRowsPerPage(pageRowCount); + } + this.index = index; + this.dimensionLens = dimensionLens; + } + + @Override public DataRefNode getNextDataRefNode() { + if (index + 1 < blockInfos.size()) { + return new BlockletDataRefNode(blockInfos, index + 1, dimensionLens); + } + return null; + } + + @Override public int numRows() { + return blockInfos.get(index).getDetailInfo().getRowCount(); + } + + @Override public long nodeIndex() { + return index; + } + + @Override public short blockletIndex() { + return blockInfos.get(index).getDetailInfo().getBlockletId(); + } + + @Override + public byte[][] getColumnsMaxValue() { + BlockletIndex blockletIndex = + blockInfos.get(index).getDetailInfo().getBlockletInfo().getBlockletIndex(); + // In case of blocklet distribution this will be null + if (null != blockletIndex) { + return blockletIndex.getMinMaxIndex().getMaxValues(); + } + return null; + } + + @Override + public byte[][] getColumnsMinValue() { + BlockletIndex blockletIndex = + blockInfos.get(index).getDetailInfo().getBlockletInfo().getBlockletIndex(); + // In case of blocklet distribution this will be null + if (null != blockletIndex) { + return blockletIndex.getMinMaxIndex().getMinValues(); + } + return null; + } + + @Override + public DimensionRawColumnChunk[] readDimensionChunks(FileReader fileReader, int[][] blockIndexes) + throws IOException { + DimensionColumnChunkReader dimensionChunksReader = getDimensionColumnChunkReader(fileReader); + return dimensionChunksReader.readRawDimensionChunks(fileReader, blockIndexes); + } + + @Override + public DimensionRawColumnChunk readDimensionChunk(FileReader fileReader, int columnIndex) + throws IOException { + DimensionColumnChunkReader dimensionChunksReader = getDimensionColumnChunkReader(fileReader); + return dimensionChunksReader.readRawDimensionChunk(fileReader, columnIndex); + } + + @Override + public MeasureRawColumnChunk[] readMeasureChunks(FileReader fileReader, int[][] columnIndexRange) + throws IOException { + MeasureColumnChunkReader measureColumnChunkReader = getMeasureColumnChunkReader(fileReader); + MeasureRawColumnChunk[] measureRawColumnChunks = + measureColumnChunkReader.readRawMeasureChunks(fileReader, columnIndexRange); + updateMeasureRawColumnChunkMinMaxValues(measureRawColumnChunks); + return measureRawColumnChunks; + } + + @Override public MeasureRawColumnChunk readMeasureChunk(FileReader fileReader, int columnIndex) + throws IOException { + MeasureColumnChunkReader measureColumnChunkReader = getMeasureColumnChunkReader(fileReader); + MeasureRawColumnChunk measureRawColumnChunk = + measureColumnChunkReader.readRawMeasureChunk(fileReader, columnIndex); + updateMeasureRawColumnChunkMinMaxValues(measureRawColumnChunk); + return measureRawColumnChunk; + } + + /** + * This method is written specifically for old store wherein the measure min and max values + * are written opposite (i.e min in place of max and amx in place of min). Due to this computing + * f measure filter with current code is impacted. In order to sync with current min and + * max values only in case old store and measures is reversed + * + * @param measureRawColumnChunk + */ + private void updateMeasureRawColumnChunkMinMaxValues( + MeasureRawColumnChunk measureRawColumnChunk) { + if (blockInfos.get(index).isDataBlockFromOldStore()) { + byte[][] maxValues = measureRawColumnChunk.getMaxValues(); + byte[][] minValues = measureRawColumnChunk.getMinValues(); + measureRawColumnChunk.setMaxValues(minValues); + measureRawColumnChunk.setMinValues(maxValues); + } + } + + private void updateMeasureRawColumnChunkMinMaxValues( + MeasureRawColumnChunk[] measureRawColumnChunks) { + if (blockInfos.get(index).isDataBlockFromOldStore()) { + for (int i = 0; i < measureRawColumnChunks.length; i++) { + if (null != measureRawColumnChunks[i]) { + updateMeasureRawColumnChunkMinMaxValues(measureRawColumnChunks[i]); + } + } + } + } + + private DimensionColumnChunkReader getDimensionColumnChunkReader(FileReader fileReader) { + ColumnarFormatVersion version = + ColumnarFormatVersion.valueOf(blockInfos.get(index).getDetailInfo().getVersionNumber()); + if (fileReader.isReadPageByPage()) { + return CarbonDataReaderFactory.getInstance().getDimensionColumnChunkReader(version, + blockInfos.get(index).getDetailInfo().getBlockletInfo(), dimensionLens, + blockInfos.get(index).getFilePath(), true); + } else { + return CarbonDataReaderFactory.getInstance().getDimensionColumnChunkReader(version, + blockInfos.get(index).getDetailInfo().getBlockletInfo(), dimensionLens, + blockInfos.get(index).getFilePath(), false); + } + } + + private MeasureColumnChunkReader getMeasureColumnChunkReader(FileReader fileReader) { + ColumnarFormatVersion version = + ColumnarFormatVersion.valueOf(blockInfos.get(index).getDetailInfo().getVersionNumber()); + if (fileReader.isReadPageByPage()) { + return CarbonDataReaderFactory.getInstance().getMeasureColumnChunkReader(version, + blockInfos.get(index).getDetailInfo().getBlockletInfo(), + blockInfos.get(index).getFilePath(), true); + } else { + return CarbonDataReaderFactory.getInstance().getMeasureColumnChunkReader(version, + blockInfos.get(index).getDetailInfo().getBlockletInfo(), + blockInfos.get(index).getFilePath(), false); + } + } + + @Override public int numberOfPages() { + return blockInfos.get(index).getDetailInfo().getPagesCount(); + } + + @Override public int getPageRowCount(int pageNumber) { + return blockInfos.get(index).getDetailInfo().getBlockletInfo() + .getNumberOfRowsPerPage()[pageNumber]; + } + + public int numberOfNodes() { + return blockInfos.size(); + } + + public List<TableBlockInfo> getBlockInfos() { + return blockInfos; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java deleted file mode 100644 index 4e49ede..0000000 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.indexstore.blockletindex; - -import java.io.IOException; -import java.util.List; - -import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache; -import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; -import org.apache.carbondata.core.constants.CarbonVersionConstants; -import org.apache.carbondata.core.datastore.DataRefNode; -import org.apache.carbondata.core.datastore.FileHolder; -import org.apache.carbondata.core.datastore.block.TableBlockInfo; -import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; -import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; -import org.apache.carbondata.core.datastore.chunk.reader.CarbonDataReaderFactory; -import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader; -import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader; -import org.apache.carbondata.core.indexstore.BlockletDetailInfo; -import org.apache.carbondata.core.metadata.ColumnarFormatVersion; -import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex; - -/** - * wrapper for blocklet data map data - */ -public class BlockletDataRefNodeWrapper implements DataRefNode { - - private List<TableBlockInfo> blockInfos; - - private int index; - - private int[] dimensionLens; - - private BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache; - - public BlockletDataRefNodeWrapper(List<TableBlockInfo> blockInfos, int index, - int[] dimensionLens) { - this.blockInfos = blockInfos; - // Update row count and page count to blocklet info - for (TableBlockInfo blockInfo : blockInfos) { - BlockletDetailInfo detailInfo = blockInfo.getDetailInfo(); - detailInfo.getBlockletInfo().setNumberOfRows(detailInfo.getRowCount()); - detailInfo.getBlockletInfo().setNumberOfPages(detailInfo.getPagesCount()); - detailInfo.setBlockletId(blockInfo.getDetailInfo().getBlockletId()); - int[] pageRowCount = new int[detailInfo.getPagesCount()]; - int numberOfPagesCompletelyFilled = detailInfo.getRowCount(); - // no. of rows to a page is 120000 in V2 and 32000 in V3, same is handled to get the number - // of pages filled - if (blockInfo.getVersion() == ColumnarFormatVersion.V2) { - numberOfPagesCompletelyFilled /= - CarbonVersionConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT_V2; - } else { - numberOfPagesCompletelyFilled /= - CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT; - } - int lastPageRowCount = detailInfo.getRowCount() - % CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT; - for (int i = 0; i < numberOfPagesCompletelyFilled; i++) { - pageRowCount[i] = - CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT; - } - if (lastPageRowCount > 0) { - pageRowCount[pageRowCount.length - 1] = lastPageRowCount; - } - detailInfo.getBlockletInfo().setNumberOfRowsPerPage(pageRowCount); - } - this.index = index; - this.dimensionLens = dimensionLens; - } - - @Override public DataRefNode getNextDataRefNode() { - if (index + 1 < blockInfos.size()) { - return new BlockletDataRefNodeWrapper(blockInfos, index + 1, dimensionLens); - } - return null; - } - - @Override public int nodeSize() { - return blockInfos.get(index).getDetailInfo().getRowCount(); - } - - @Override public long nodeNumber() { - return index; - } - - @Override public String blockletId() { - return blockInfos.get(index).getDetailInfo().getBlockletId().toString(); - } - - @Override - public byte[][] getColumnsMaxValue() { - BlockletIndex blockletIndex = - blockInfos.get(index).getDetailInfo().getBlockletInfo().getBlockletIndex(); - // In case of blocklet distribution this will be null - if (null != blockletIndex) { - return blockletIndex.getMinMaxIndex().getMaxValues(); - } - return null; - } - - @Override - public byte[][] getColumnsMinValue() { - BlockletIndex blockletIndex = - blockInfos.get(index).getDetailInfo().getBlockletInfo().getBlockletIndex(); - // In case of blocklet distribution this will be null - if (null != blockletIndex) { - return blockletIndex.getMinMaxIndex().getMinValues(); - } - return null; - } - - @Override - public DimensionRawColumnChunk[] getDimensionChunks(FileHolder fileReader, int[][] blockIndexes) - throws IOException { - DimensionColumnChunkReader dimensionChunksReader = getDimensionColumnChunkReader(fileReader); - return dimensionChunksReader.readRawDimensionChunks(fileReader, blockIndexes); - } - - @Override - public DimensionRawColumnChunk getDimensionChunk(FileHolder fileReader, int blockIndexes) - throws IOException { - DimensionColumnChunkReader dimensionChunksReader = getDimensionColumnChunkReader(fileReader); - return dimensionChunksReader.readRawDimensionChunk(fileReader, blockIndexes); - } - - @Override - public MeasureRawColumnChunk[] getMeasureChunks(FileHolder fileReader, int[][] blockIndexes) - throws IOException { - MeasureColumnChunkReader measureColumnChunkReader = getMeasureColumnChunkReader(fileReader); - MeasureRawColumnChunk[] measureRawColumnChunks = - measureColumnChunkReader.readRawMeasureChunks(fileReader, blockIndexes); - updateMeasureRawColumnChunkMinMaxValues(measureRawColumnChunks); - return measureRawColumnChunks; - } - - @Override public MeasureRawColumnChunk getMeasureChunk(FileHolder fileReader, int blockIndex) - throws IOException { - MeasureColumnChunkReader measureColumnChunkReader = getMeasureColumnChunkReader(fileReader); - MeasureRawColumnChunk measureRawColumnChunk = - measureColumnChunkReader.readRawMeasureChunk(fileReader, blockIndex); - updateMeasureRawColumnChunkMinMaxValues(measureRawColumnChunk); - return measureRawColumnChunk; - } - - /** - * This method is written specifically for old store wherein the measure min and max values - * are written opposite (i.e min in place of max and amx in place of min). Due to this computing - * f measure filter with current code is impacted. In order to sync with current min and - * max values only in case old store and measures is reversed - * - * @param measureRawColumnChunk - */ - private void updateMeasureRawColumnChunkMinMaxValues( - MeasureRawColumnChunk measureRawColumnChunk) { - if (blockInfos.get(index).isDataBlockFromOldStore()) { - byte[][] maxValues = measureRawColumnChunk.getMaxValues(); - byte[][] minValues = measureRawColumnChunk.getMinValues(); - measureRawColumnChunk.setMaxValues(minValues); - measureRawColumnChunk.setMinValues(maxValues); - } - } - - private void updateMeasureRawColumnChunkMinMaxValues( - MeasureRawColumnChunk[] measureRawColumnChunks) { - if (blockInfos.get(index).isDataBlockFromOldStore()) { - for (int i = 0; i < measureRawColumnChunks.length; i++) { - if (null != measureRawColumnChunks[i]) { - updateMeasureRawColumnChunkMinMaxValues(measureRawColumnChunks[i]); - } - } - } - } - - private DimensionColumnChunkReader getDimensionColumnChunkReader(FileHolder fileReader) { - ColumnarFormatVersion version = - ColumnarFormatVersion.valueOf(blockInfos.get(index).getDetailInfo().getVersionNumber()); - if (fileReader.isReadPageByPage()) { - return CarbonDataReaderFactory.getInstance().getDimensionColumnChunkReader(version, - blockInfos.get(index).getDetailInfo().getBlockletInfo(), dimensionLens, - blockInfos.get(index).getFilePath(), true); - } else { - return CarbonDataReaderFactory.getInstance().getDimensionColumnChunkReader(version, - blockInfos.get(index).getDetailInfo().getBlockletInfo(), dimensionLens, - blockInfos.get(index).getFilePath(), false); - } - } - - private MeasureColumnChunkReader getMeasureColumnChunkReader(FileHolder fileReader) { - ColumnarFormatVersion version = - ColumnarFormatVersion.valueOf(blockInfos.get(index).getDetailInfo().getVersionNumber()); - if (fileReader.isReadPageByPage()) { - return CarbonDataReaderFactory.getInstance().getMeasureColumnChunkReader(version, - blockInfos.get(index).getDetailInfo().getBlockletInfo(), - blockInfos.get(index).getFilePath(), true); - } else { - return CarbonDataReaderFactory.getInstance().getMeasureColumnChunkReader(version, - blockInfos.get(index).getDetailInfo().getBlockletInfo(), - blockInfos.get(index).getFilePath(), false); - } - } - - @Override - public void setDeleteDeltaDataCache(BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache) { - this.deleteDeltaDataCache = deleteDeltaDataCache; - } - - @Override public BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache() { - return deleteDeltaDataCache; - } - - @Override public int numberOfPages() { - return blockInfos.get(index).getDetailInfo().getPagesCount(); - } - - @Override public int getPageRowCount(int pageNumber) { - return blockInfos.get(index).getDetailInfo().getBlockletInfo() - .getNumberOfRowsPerPage()[pageNumber]; - } - - public int numberOfNodes() { - return blockInfos.size(); - } - - public List<TableBlockInfo> getBlockInfos() { - return blockInfos; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java index 17ad17f..a30f64c 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java @@ -32,7 +32,7 @@ public class IndexWrapper extends AbstractIndex { public IndexWrapper(List<TableBlockInfo> blockInfos) { segmentProperties = new SegmentProperties(blockInfos.get(0).getDetailInfo().getColumnSchemas(), blockInfos.get(0).getDetailInfo().getDimLens()); - dataRefNode = new BlockletDataRefNodeWrapper(blockInfos, 0, + dataRefNode = new BlockletDataRefNode(blockInfos, 0, segmentProperties.getDimensionColumnsValueSize()); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java index 9364a7a..6803fc8 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java @@ -19,7 +19,12 @@ package org.apache.carbondata.core.indexstore.blockletindex; import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java b/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java index 242995b..53cbb1d 100644 --- a/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java +++ b/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java @@ -17,11 +17,11 @@ package org.apache.carbondata.core.memory; +import javax.annotation.concurrent.GuardedBy; import java.lang.ref.WeakReference; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; -import javax.annotation.concurrent.GuardedBy; import org.apache.carbondata.core.util.CarbonProperties;