http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneLong.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneLong.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneLong.java new file mode 100644 index 0000000..04bab54 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneLong.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.datastore.compression.none; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory; +import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore; +import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; +import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder; +import org.apache.carbondata.core.util.ValueCompressionUtil; +import org.apache.carbondata.core.util.ValueCompressionUtil.DataType; + +public class CompressionNoneLong extends ValueCompressionHolder<long[]> { + /** + * Attribute for Carbon LOGGER + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(CompressionNoneLong.class.getName()); + /** + * longCompressor. + */ + private static Compressor compressor = CompressorFactory.getInstance().getCompressor(); + /** + * value. + */ + protected long[] value; + + private DataType actualDataType; + + private MeasureDataChunkStore<long[]> measureChunkStore; + + public CompressionNoneLong(DataType actualDataType) { + this.actualDataType = actualDataType; + } + + @Override public void setValue(long[] value) { this.value = value; } + + @Override public long[] getValue() { return this.value; } + + @Override public void compress() { + compressedValue = super.compress(compressor, DataType.DATA_LONG, value); + } + + @Override + public void uncompress(DataType dataType, byte[] data, int offset, int length, + int decimalPlaces, Object maxValueObject) { + super.unCompress(compressor, dataType, data, offset, length); + setUncompressedValues(value); + } + + @Override public void setValueInBytes(byte[] byteValue) { + ByteBuffer buffer = ByteBuffer.wrap(byteValue); + this.value = ValueCompressionUtil.convertToLongArray(buffer, byteValue.length); + } + + @Override public long getLongValue(int index) { + return measureChunkStore.getLong(index); + } + + @Override public double getDoubleValue(int index) { + return measureChunkStore.getLong(index); + } + + @Override public BigDecimal getBigDecimalValue(int index) { + throw new UnsupportedOperationException("Get big decimal is not supported"); + } + + private void setUncompressedValues(long[] data) { + this.measureChunkStore = + MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_LONG, data.length); + this.measureChunkStore.putData(data); + } + + @Override public void freeMemory() { + this.measureChunkStore.freeMemory(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneShort.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneShort.java new file mode 100644 index 0000000..b84c562 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneShort.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.datastore.compression.none; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory; +import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore; +import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; +import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder; +import org.apache.carbondata.core.util.ValueCompressionUtil; +import org.apache.carbondata.core.util.ValueCompressionUtil.DataType; + +public class CompressionNoneShort extends ValueCompressionHolder<short[]> { + /** + * Attribute for Carbon LOGGER + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(CompressionNoneShort.class.getName()); + + /** + * shortCompressor. + */ + private static Compressor compressor = CompressorFactory.getInstance().getCompressor(); + + /** + * value. + */ + private short[] shortValue; + + private MeasureDataChunkStore<short[]> measureChunkStore; + + private DataType actualDataType; + + public CompressionNoneShort(DataType actualDataType) { + this.actualDataType = actualDataType; + } + + @Override public void setValue(short[] shortValue) { + this.shortValue = shortValue; + } + + @Override public short[] getValue() { return this.shortValue; } + + @Override + public void uncompress(DataType dataType, byte[] data, int offset, int length, + int decimalPlaces, Object maxValueObject) { + super.unCompress(compressor, dataType, data, offset, length); + setUncompressedValues(shortValue); + } + + @Override public void compress() { + compressedValue = super.compress(compressor, DataType.DATA_SHORT, shortValue); + } + + @Override public void setValueInBytes(byte[] value) { + ByteBuffer buffer = ByteBuffer.wrap(value); + shortValue = ValueCompressionUtil.convertToShortArray(buffer, value.length); + } + + @Override public long getLongValue(int index) { + return measureChunkStore.getShort(index); + } + + @Override public double getDoubleValue(int index) { + return measureChunkStore.getShort(index); + } + + @Override public BigDecimal getBigDecimalValue(int index) { + throw new UnsupportedOperationException( + "Big decimal is not defined for CompressionNonShort"); + } + + private void setUncompressedValues(short[] data) { + this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE + .getMeasureDataChunkStore(DataType.DATA_SHORT, data.length); + this.measureChunkStore.putData(data); + } + + @Override public void freeMemory() { + this.measureChunkStore.freeMemory(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/compression/type/CompressionBigDecimal.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/type/CompressionBigDecimal.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/type/CompressionBigDecimal.java new file mode 100644 index 0000000..eb0e5d0 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/type/CompressionBigDecimal.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.datastore.compression.type; + + +import java.math.BigDecimal; +import java.nio.ByteBuffer; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder; +import org.apache.carbondata.core.util.BigDecimalCompressionFinder; +import org.apache.carbondata.core.util.ValueCompressionUtil.DataType; + +/** + * Big decimal compression + */ +public class CompressionBigDecimal<T> extends ValueCompressionHolder<T> { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(CompressionBigDecimal.class.getName()); + + private BigDecimalCompressionFinder compressionFinder; + + /** + * leftPart before decimal + */ + private ValueCompressionHolder leftPart; + + /** + * rightPart after decimal + */ + private ValueCompressionHolder rightPart; + + private double divisionFactor; + + private boolean isDecimalPlacesNotZero; + + public CompressionBigDecimal(BigDecimalCompressionFinder compressionFinder, + ValueCompressionHolder leftPart, ValueCompressionHolder rightPart) { + this.compressionFinder = compressionFinder; + this.leftPart = leftPart; + this.rightPart = rightPart; + } + + @Override public void setValue(T value) { + Object[] values = (Object[]) value; + leftPart.setValue(values[0]); + rightPart.setValue(values[1]); + } + + @Override + public void uncompress(DataType dataType, byte[] data, int offset, int length, + int decimalPlaces, Object maxValueObject) { + if (decimalPlaces > 0) { + this.isDecimalPlacesNotZero = true; + } + this.divisionFactor = Math.pow(10, decimalPlaces); + + ByteBuffer buffer = ByteBuffer.wrap(data, offset, length); + int leftPathLength = buffer.getInt(); + int rightPartLength = length - leftPathLength - CarbonCommonConstants.INT_SIZE_IN_BYTE; + Long[] maxValue = (Long[]) maxValueObject; + leftPart.uncompress(compressionFinder.getLeftConvertedDataType(), data, + offset + CarbonCommonConstants.INT_SIZE_IN_BYTE, leftPathLength, decimalPlaces, + maxValue[0]); + rightPart.uncompress(compressionFinder.getRightConvertedDataType(), data, + offset + CarbonCommonConstants.INT_SIZE_IN_BYTE + leftPathLength, rightPartLength, + decimalPlaces, maxValue[1]); + } + + @Override public long getLongValue(int index) { + throw new UnsupportedOperationException( + "Long is not defined for CompressionBigDecimal"); + } + + @Override public double getDoubleValue(int index) { + throw new UnsupportedOperationException( + "Double is not defined for CompressionBigDecimal"); + } + + @Override public BigDecimal getBigDecimalValue(int index) { + long leftValue = leftPart.getLongValue(index); + long rightValue = 0; + if (isDecimalPlacesNotZero) { + rightValue = rightPart.getLongValue(index); + } + String decimalPart = Double.toString(rightValue / this.divisionFactor); + String bigdStr = Long.toString(leftValue) + CarbonCommonConstants.POINT + decimalPart + .substring(decimalPart.indexOf(".") + 1, decimalPart.length()); + return new BigDecimal(bigdStr); + } + + @Override public T getValue() { + Object[] values = new Object[2]; + values[0] = leftPart; + values[1] = rightPart; + return (T) values; + } + + @Override public void setValueInBytes(byte[] value) { + LOGGER.error("setValueInBytes() is not defined for CompressionBigDecimal"); + } + + @Override public void compress() { + leftPart.compress(); + rightPart.compress(); + } + + @Override + public byte[] getCompressedData() { + byte[] leftdata = leftPart.getCompressedData(); + byte[] rightdata = rightPart.getCompressedData(); + ByteBuffer byteBuffer = ByteBuffer + .allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE + leftdata.length + + rightdata.length); + byteBuffer.putInt(leftdata.length); + byteBuffer.put(leftdata); + byteBuffer.put(rightdata); + byteBuffer.flip(); + return byteBuffer.array(); + } + + @Override public void freeMemory() { + leftPart.freeMemory(); + rightPart.freeMemory(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/dataholder/CarbonReadDataHolder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/dataholder/CarbonReadDataHolder.java b/core/src/main/java/org/apache/carbondata/core/datastore/dataholder/CarbonReadDataHolder.java new file mode 100644 index 0000000..6bd28fc --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/dataholder/CarbonReadDataHolder.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.datastore.dataholder; + +import java.math.BigDecimal; + +import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder; + +// This class is used with Uncompressor to hold the decompressed column chunk in memory +public class CarbonReadDataHolder { + + private ValueCompressionHolder unCompressValue; + + public CarbonReadDataHolder(ValueCompressionHolder unCompressValue) { + this.unCompressValue = unCompressValue; + } + + public long getReadableLongValueByIndex(int index) { + return this.unCompressValue.getLongValue(index); + } + + public BigDecimal getReadableBigDecimalValueByIndex(int index) { + return this.unCompressValue.getBigDecimalValue(index); + } + + public double getReadableDoubleValueByIndex(int index) { + return this.unCompressValue.getDoubleValue(index); + } + + public void freeMemory() { + unCompressValue.freeMemory(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/dataholder/CarbonWriteDataHolder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/dataholder/CarbonWriteDataHolder.java b/core/src/main/java/org/apache/carbondata/core/datastore/dataholder/CarbonWriteDataHolder.java new file mode 100644 index 0000000..23799e8 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/dataholder/CarbonWriteDataHolder.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.datastore.dataholder; + +public class CarbonWriteDataHolder { + /** + * doubleValues + */ + private double[] doubleValues; + + /** + * longValues + */ + private long[] longValues; + + /** + * bigDecimal left part + */ + private long[] bigDecimalLeftValues; + + /** + * bigDecimal right part + */ + private long[] bigDecimalRightValues; + /** + * byteValues + */ + private byte[][] byteValues; + + /** + * byteValues for no dictionary and non kettle flow. + */ + private byte[][][] byteValuesForNonDictionary; + + /** + * byteValues + */ + private byte[][][] columnByteValues; + + /** + * size + */ + private int size; + + /** + * totalSize + */ + private int totalSize; + + /** + * Method to initialise double array + * + * @param size + */ + public void initialiseDoubleValues(int size) { + if (size < 1) { + throw new IllegalArgumentException("Invalid array size"); + } + doubleValues = new double[size]; + } + + public void reset() { + size = 0; + totalSize = 0; + } + + /** + * Method to initialise double array + * TODO Remove after kettle flow got removed. + * + * @param size + */ + public void initialiseByteArrayValues(int size) { + if (size < 1) { + throw new IllegalArgumentException("Invalid array size"); + } + + byteValues = new byte[size][]; + columnByteValues = new byte[size][][]; + } + + /** + * Method to initialise byte array + * + * @param size + */ + public void initialiseByteArrayValuesWithOutKettle(int size) { + if (size < 1) { + throw new IllegalArgumentException("Invalid array size"); + } + + byteValues = new byte[size][]; + } + + public void initialiseByteArrayValuesForNonDictionary(int size) { + if (size < 1) { + throw new IllegalArgumentException("Invalid array size"); + } + + byteValuesForNonDictionary = new byte[size][][]; + } + + /** + * Method to initialise long array + * + * @param size + */ + public void initialiseLongValues(int size) { + if (size < 1) { + throw new IllegalArgumentException("Invalid array size"); + } + longValues = new long[size]; + } + + public void initialiseBigDecimalValues(int size) { + if (size < 1) { + throw new IllegalArgumentException("Invalid array size"); + } + bigDecimalLeftValues = new long[size]; + bigDecimalRightValues = new long[size]; + } + + /** + * set double value by index + * + * @param index + * @param value + */ + public void setWritableDoubleValueByIndex(int index, Object value) { + doubleValues[index] = (Double) value; + size++; + } + + /** + * set double value by index + * + * @param index + * @param value + */ + public void setWritableLongValueByIndex(int index, Object value) { + longValues[index] = (Long) value; + size++; + } + + /** + * set bigdecimal value by index + * + * @param index + * @param value + */ + public void setWritableBigDecimalValueByIndex(int index, long[] value) { + bigDecimalLeftValues[index] = value[0]; + bigDecimalRightValues[index] = value[1]; + size++; + } + /** + * set byte array value by index + * + * @param index + * @param value + */ + public void setWritableByteArrayValueByIndex(int index, byte[] value) { + byteValues[index] = value; + size++; + if (null != value) totalSize += value.length; + } + + public void setWritableNonDictByteArrayValueByIndex(int index, byte[][] value) { + byteValuesForNonDictionary[index] = value; + size++; + if (null != value) totalSize += value.length; + } + + /** + * set byte array value by index + */ + public void setWritableByteArrayValueByIndex(int index, int mdKeyIndex, Object[] columnData) { + int l = 0; + columnByteValues[index] = new byte[columnData.length - (mdKeyIndex + 1)][]; + for (int i = mdKeyIndex + 1; i < columnData.length; i++) { + columnByteValues[index][l++] = (byte[]) columnData[i]; + } + } + + /** + * Get Writable Double Values + */ + public double[] getWritableDoubleValues() { + if (size < doubleValues.length) { + double[] temp = new double[size]; + System.arraycopy(doubleValues, 0, temp, 0, size); + doubleValues = temp; + } + return doubleValues; + } + + /** + * Get writable byte array values + */ + public byte[] getWritableByteArrayValues() { + byte[] temp = new byte[totalSize]; + int startIndexToCopy = 0; + for (int i = 0; i < size; i++) { + System.arraycopy(byteValues[i], 0, temp, startIndexToCopy, byteValues[i].length); + startIndexToCopy += byteValues[i].length; + } + return temp; + } + + public byte[][] getByteArrayValues() { + if (size < byteValues.length) { + byte[][] temp = new byte[size][]; + System.arraycopy(byteValues, 0, temp, 0, size); + byteValues = temp; + } + return byteValues; + } + + public byte[][][] getNonDictByteArrayValues() { + if (size < byteValuesForNonDictionary.length) { + byte[][][] temp = new byte[size][][]; + System.arraycopy(byteValuesForNonDictionary, 0, temp, 0, size); + byteValuesForNonDictionary = temp; + } + return byteValuesForNonDictionary; + } + + /** + * Get Writable Double Values + * + * @return + */ + public long[] getWritableLongValues() { + if (size < longValues.length) { + long[] temp = new long[size]; + System.arraycopy(longValues, 0, temp, 0, size); + longValues = temp; + } + return longValues; + } + + /** + * Get Writable bigdecimal Values + * + * @return + */ + public long[][] getWritableBigDecimalValues() { + long[][] bigDecimalValues = new long[2][]; + if (size < bigDecimalLeftValues.length) { + long[] temp = new long[size]; + System.arraycopy(bigDecimalLeftValues, 0, temp, 0, size); + bigDecimalLeftValues = temp; + } + if (size < bigDecimalRightValues.length) { + long[] temp = new long[size]; + System.arraycopy(bigDecimalRightValues, 0, temp, 0, size); + bigDecimalRightValues = temp; + } + bigDecimalValues[0]= bigDecimalLeftValues; + bigDecimalValues[1] = bigDecimalRightValues; + return bigDecimalValues; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java new file mode 100644 index 0000000..01ad646 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.datastore.filesystem; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public abstract class AbstractDFSCarbonFile implements CarbonFile { + /** + * LOGGER + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(AbstractDFSCarbonFile.class.getName()); + protected FileStatus fileStatus; + protected FileSystem fs; + + public AbstractDFSCarbonFile(String filePath) { + filePath = filePath.replace("\\", "/"); + Path path = new Path(filePath); + try { + fs = path.getFileSystem(FileFactory.getConfiguration()); + fileStatus = fs.getFileStatus(path); + } catch (IOException e) { + LOGGER.error("Exception occured:" + e.getMessage()); + } + } + + public AbstractDFSCarbonFile(Path path) { + try { + fs = path.getFileSystem(FileFactory.getConfiguration()); + fileStatus = fs.getFileStatus(path); + } catch (IOException e) { + LOGGER.error("Exception occured:" + e.getMessage()); + } + } + + public AbstractDFSCarbonFile(FileStatus fileStatus) { + this.fileStatus = fileStatus; + } + + @Override public boolean createNewFile() { + Path path = fileStatus.getPath(); + try { + return fs.createNewFile(path); + } catch (IOException e) { + return false; + } + } + + @Override public String getAbsolutePath() { + return fileStatus.getPath().toString(); + } + + @Override public String getName() { + return fileStatus.getPath().getName(); + } + + @Override public boolean isDirectory() { + return fileStatus.isDirectory(); + } + + @Override public boolean exists() { + try { + if (null != fileStatus) { + fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration()); + return fs.exists(fileStatus.getPath()); + } + } catch (IOException e) { + LOGGER.error("Exception occured:" + e.getMessage()); + } + return false; + } + + @Override public String getCanonicalPath() { + return getAbsolutePath(); + } + + @Override public String getPath() { + return getAbsolutePath(); + } + + @Override public long getSize() { + return fileStatus.getLen(); + } + + public boolean renameTo(String changetoName) { + FileSystem fs; + try { + fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration()); + return fs.rename(fileStatus.getPath(), new Path(changetoName)); + } catch (IOException e) { + LOGGER.error("Exception occured:" + e.getMessage()); + return false; + } + } + + public boolean delete() { + FileSystem fs; + try { + fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration()); + return fs.delete(fileStatus.getPath(), true); + } catch (IOException e) { + LOGGER.error("Exception occured:" + e.getMessage()); + return false; + } + } + + @Override public long getLastModifiedTime() { + return fileStatus.getModificationTime(); + } + + @Override public boolean setLastModifiedTime(long timestamp) { + try { + fs.setTimes(fileStatus.getPath(), timestamp, timestamp); + } catch (IOException e) { + return false; + } + return true; + } + + /** + * This method will delete the data in file data from a given offset + */ + @Override public boolean truncate(String fileName, long validDataEndOffset) { + DataOutputStream dataOutputStream = null; + DataInputStream dataInputStream = null; + boolean fileTruncatedSuccessfully = false; + // if bytes to read less than 1024 then buffer size should be equal to the given offset + int bufferSize = validDataEndOffset > CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR ? + CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR : + (int) validDataEndOffset; + // temporary file name + String tempWriteFilePath = fileName + CarbonCommonConstants.TEMPWRITEFILEEXTENSION; + FileFactory.FileType fileType = FileFactory.getFileType(fileName); + try { + CarbonFile tempFile = null; + // delete temporary file if it already exists at a given path + if (FileFactory.isFileExist(tempWriteFilePath, fileType)) { + tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType); + tempFile.delete(); + } + // create new temporary file + FileFactory.createNewFile(tempWriteFilePath, fileType); + tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType); + byte[] buff = new byte[bufferSize]; + dataInputStream = FileFactory.getDataInputStream(fileName, fileType); + // read the data + int read = dataInputStream.read(buff, 0, buff.length); + dataOutputStream = FileFactory.getDataOutputStream(tempWriteFilePath, fileType); + dataOutputStream.write(buff, 0, read); + long remaining = validDataEndOffset - read; + // anytime we should not cross the offset to be read + while (remaining > 0) { + if (remaining > bufferSize) { + buff = new byte[bufferSize]; + } else { + buff = new byte[(int) remaining]; + } + read = dataInputStream.read(buff, 0, buff.length); + dataOutputStream.write(buff, 0, read); + remaining = remaining - read; + } + CarbonUtil.closeStreams(dataInputStream, dataOutputStream); + // rename the temp file to original file + tempFile.renameForce(fileName); + fileTruncatedSuccessfully = true; + } catch (IOException e) { + LOGGER.error("Exception occured while truncating the file " + e.getMessage()); + } finally { + CarbonUtil.closeStreams(dataOutputStream, dataInputStream); + } + return fileTruncatedSuccessfully; + } + + /** + * This method will be used to check whether a file has been modified or not + * + * @param fileTimeStamp time to be compared with latest timestamp of file + * @param endOffset file length to be compared with current length of file + * @return + */ + @Override public boolean isFileModified(long fileTimeStamp, long endOffset) { + boolean isFileModified = false; + if (getLastModifiedTime() > fileTimeStamp || getSize() > endOffset) { + isFileModified = true; + } + return isFileModified; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java new file mode 100644 index 0000000..51fcd53 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.datastore.filesystem; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.impl.FileFactory; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; + + + +public class AlluxioCarbonFile extends AbstractDFSCarbonFile { + /** + * LOGGER + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(AlluxioCarbonFile.class.getName()); + + public AlluxioCarbonFile(String filePath) { + super(filePath); + } + + public AlluxioCarbonFile(Path path) { + super(path); + } + + public AlluxioCarbonFile(FileStatus fileStatus) { + super(fileStatus); + } + + /** + * @param listStatus + * @return + */ + private CarbonFile[] getFiles(FileStatus[] listStatus) { + if (listStatus == null) { + return new CarbonFile[0]; + } + CarbonFile[] files = new CarbonFile[listStatus.length]; + for (int i = 0; i < files.length; i++) { + files[i] = new AlluxioCarbonFile(listStatus[i]); + } + return files; + } + + @Override + public CarbonFile[] listFiles() { + FileStatus[] listStatus = null; + try { + if (null != fileStatus && fileStatus.isDirectory()) { + Path path = fileStatus.getPath(); + listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path); + } else { + return null; + } + } catch (IOException e) { + LOGGER.error("Exception occured: " + e.getMessage()); + return new CarbonFile[0]; + } + return getFiles(listStatus); + } + + @Override + public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) { + CarbonFile[] files = listFiles(); + if (files != null && files.length >= 1) { + List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length); + for (int i = 0; i < files.length; i++) { + if (fileFilter.accept(files[i])) { + fileList.add(files[i]); + } + } + if (fileList.size() >= 1) { + return fileList.toArray(new CarbonFile[fileList.size()]); + } else { + return new CarbonFile[0]; + } + } + return files; + } + + @Override + public CarbonFile getParentFile() { + Path parent = fileStatus.getPath().getParent(); + return null == parent ? null : new AlluxioCarbonFile(parent); + } + + @Override + public boolean renameForce(String changetoName) { + FileSystem fs; + try { + fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration()); + if (fs instanceof DistributedFileSystem) { + ((DistributedFileSystem) fs).rename(fileStatus.getPath(), new Path(changetoName), + org.apache.hadoop.fs.Options.Rename.OVERWRITE); + return true; + } else { + return false; + } + } catch (IOException e) { + LOGGER.error("Exception occured: " + e.getMessage()); + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java new file mode 100644 index 0000000..a00f633 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.datastore.filesystem; + +public interface CarbonFile { + + String getAbsolutePath(); + + CarbonFile[] listFiles(CarbonFileFilter fileFilter); + + CarbonFile[] listFiles(); + + String getName(); + + boolean isDirectory(); + + boolean exists(); + + String getCanonicalPath(); + + CarbonFile getParentFile(); + + String getPath(); + + long getSize(); + + boolean renameTo(String changetoName); + + boolean renameForce(String changetoName); + + boolean delete(); + + boolean createNewFile(); + + long getLastModifiedTime(); + + boolean setLastModifiedTime(long timestamp); + + boolean truncate(String fileName, long validDataEndOffset); + + /** + * This method will be used to check whether a file has been modified or not + * + * @param fileTimeStamp time to be compared with latest timestamp of file + * @param endOffset file length to be compared with current length of file + * @return + */ + boolean isFileModified(long fileTimeStamp, long endOffset); +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFileFilter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFileFilter.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFileFilter.java new file mode 100644 index 0000000..dc3df36 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFileFilter.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.datastore.filesystem; + +public interface CarbonFileFilter { + boolean accept(CarbonFile file); +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java new file mode 100644 index 0000000..7137087 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.datastore.filesystem; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.impl.FileFactory; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; + +public class HDFSCarbonFile extends AbstractDFSCarbonFile { + /** + * LOGGER + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(HDFSCarbonFile.class.getName()); + + public HDFSCarbonFile(String filePath) { + super(filePath); + } + + public HDFSCarbonFile(Path path) { + super(path); + } + + public HDFSCarbonFile(FileStatus fileStatus) { + super(fileStatus); + } + + /** + * @param listStatus + * @return + */ + private CarbonFile[] getFiles(FileStatus[] listStatus) { + if (listStatus == null) { + return new CarbonFile[0]; + } + CarbonFile[] files = new CarbonFile[listStatus.length]; + for (int i = 0; i < files.length; i++) { + files[i] = new HDFSCarbonFile(listStatus[i]); + } + return files; + } + + @Override + public CarbonFile[] listFiles() { + FileStatus[] listStatus = null; + try { + if (null != fileStatus && fileStatus.isDirectory()) { + Path path = fileStatus.getPath(); + listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path); + } else { + return null; + } + } catch (IOException e) { + LOGGER.error("Exception occured: " + e.getMessage()); + return new CarbonFile[0]; + } + return getFiles(listStatus); + } + + @Override + public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) { + CarbonFile[] files = listFiles(); + if (files != null && files.length >= 1) { + List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length); + for (int i = 0; i < files.length; i++) { + if (fileFilter.accept(files[i])) { + fileList.add(files[i]); + } + } + if (fileList.size() >= 1) { + return fileList.toArray(new CarbonFile[fileList.size()]); + } else { + return new CarbonFile[0]; + } + } + return files; + } + + @Override + public CarbonFile getParentFile() { + Path parent = fileStatus.getPath().getParent(); + return null == parent ? null : new HDFSCarbonFile(parent); + } + + @Override + public boolean renameForce(String changetoName) { + FileSystem fs; + try { + fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration()); + if (fs instanceof DistributedFileSystem) { + ((DistributedFileSystem) fs).rename(fileStatus.getPath(), new Path(changetoName), + org.apache.hadoop.fs.Options.Rename.OVERWRITE); + return true; + } else { + return false; + } + } catch (IOException e) { + LOGGER.error("Exception occured: " + e.getMessage()); + return false; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java new file mode 100644 index 0000000..a6dbfa7 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.datastore.filesystem; + +import java.io.File; +import java.io.FileFilter; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.channels.FileChannel; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.hadoop.fs.Path; + +public class LocalCarbonFile implements CarbonFile { + private static final LogService LOGGER = + LogServiceFactory.getLogService(LocalCarbonFile.class.getName()); + private File file; + + public LocalCarbonFile(String filePath) { + Path pathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(new Path(filePath)); + file = new File(pathWithoutSchemeAndAuthority.toString()); + } + + public LocalCarbonFile(File file) { + this.file = file; + } + + @Override public String getAbsolutePath() { + return file.getAbsolutePath(); + } + + @Override public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) { + if (!file.isDirectory()) { + return null; + } + + File[] files = file.listFiles(new FileFilter() { + + @Override public boolean accept(File pathname) { + return fileFilter.accept(new LocalCarbonFile(pathname)); + } + }); + + if (files == null) { + return new CarbonFile[0]; + } + + CarbonFile[] carbonFiles = new CarbonFile[files.length]; + + for (int i = 0; i < carbonFiles.length; i++) { + carbonFiles[i] = new LocalCarbonFile(files[i]); + } + + return carbonFiles; + } + + @Override public String getName() { + return file.getName(); + } + + @Override public boolean isDirectory() { + return file.isDirectory(); + } + + @Override public boolean exists() { + if (file != null) { + return file.exists(); + } + return false; + } + + @Override public String getCanonicalPath() { + try { + return file.getCanonicalPath(); + } catch (IOException e) { + LOGGER + .error(e, "Exception occured" + e.getMessage()); + } + return null; + } + + @Override public CarbonFile getParentFile() { + return new LocalCarbonFile(file.getParentFile()); + } + + @Override public String getPath() { + return file.getPath(); + } + + @Override public long getSize() { + return file.length(); + } + + public boolean renameTo(String changetoName) { + return file.renameTo(new File(changetoName)); + } + + public boolean delete() { + return file.delete(); + } + + @Override public CarbonFile[] listFiles() { + + if (!file.isDirectory()) { + return null; + } + File[] files = file.listFiles(); + if (files == null) { + return new CarbonFile[0]; + } + CarbonFile[] carbonFiles = new CarbonFile[files.length]; + for (int i = 0; i < carbonFiles.length; i++) { + carbonFiles[i] = new LocalCarbonFile(files[i]); + } + + return carbonFiles; + + } + + @Override public boolean createNewFile() { + try { + return file.createNewFile(); + } catch (IOException e) { + return false; + } + } + + @Override public long getLastModifiedTime() { + return file.lastModified(); + } + + @Override public boolean setLastModifiedTime(long timestamp) { + return file.setLastModified(timestamp); + } + + /** + * This method will delete the data in file data from a given offset + */ + @Override public boolean truncate(String fileName, long validDataEndOffset) { + FileChannel source = null; + FileChannel destination = null; + boolean fileTruncatedSuccessfully = false; + // temporary file name + String tempWriteFilePath = fileName + CarbonCommonConstants.TEMPWRITEFILEEXTENSION; + FileFactory.FileType fileType = FileFactory.getFileType(fileName); + try { + CarbonFile tempFile = null; + // delete temporary file if it already exists at a given path + if (FileFactory.isFileExist(tempWriteFilePath, fileType)) { + tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType); + tempFile.delete(); + } + // create new temporary file + FileFactory.createNewFile(tempWriteFilePath, fileType); + tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType); + source = new FileInputStream(fileName).getChannel(); + destination = new FileOutputStream(tempWriteFilePath).getChannel(); + long read = destination.transferFrom(source, 0, validDataEndOffset); + long totalBytesRead = read; + long remaining = validDataEndOffset - totalBytesRead; + // read till required data offset is not reached + while (remaining > 0) { + read = destination.transferFrom(source, totalBytesRead, remaining); + totalBytesRead = totalBytesRead + read; + remaining = remaining - totalBytesRead; + } + CarbonUtil.closeStreams(source, destination); + // rename the temp file to original file + tempFile.renameForce(fileName); + fileTruncatedSuccessfully = true; + } catch (IOException e) { + LOGGER.error("Exception occured while truncating the file " + e.getMessage()); + } finally { + CarbonUtil.closeStreams(source, destination); + } + return fileTruncatedSuccessfully; + } + + /** + * This method will be used to check whether a file has been modified or not + * + * @param fileTimeStamp time to be compared with latest timestamp of file + * @param endOffset file length to be compared with current length of file + * @return + */ + @Override public boolean isFileModified(long fileTimeStamp, long endOffset) { + boolean isFileModified = false; + if (getLastModifiedTime() > fileTimeStamp || getSize() > endOffset) { + isFileModified = true; + } + return isFileModified; + } + + @Override public boolean renameForce(String changetoName) { + File destFile = new File(changetoName); + if (destFile.exists()) { + if (destFile.delete()) { + return file.renameTo(new File(changetoName)); + } + } + + return file.renameTo(new File(changetoName)); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java new file mode 100644 index 0000000..c7dd192 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.datastore.filesystem; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.impl.FileFactory; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.viewfs.ViewFileSystem; + +public class ViewFSCarbonFile extends AbstractDFSCarbonFile { + /** + * LOGGER + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(ViewFSCarbonFile.class.getName()); + + public ViewFSCarbonFile(String filePath) { + super(filePath); + } + + public ViewFSCarbonFile(Path path) { + super(path); + } + + public ViewFSCarbonFile(FileStatus fileStatus) { + super(fileStatus); + } + + /** + * @param listStatus + * @return + */ + private CarbonFile[] getFiles(FileStatus[] listStatus) { + if (listStatus == null) { + return new CarbonFile[0]; + } + CarbonFile[] files = new CarbonFile[listStatus.length]; + for (int i = 0; i < files.length; i++) { + files[i] = new ViewFSCarbonFile(listStatus[i]); + } + return files; + } + + @Override + public CarbonFile[] listFiles() { + FileStatus[] listStatus = null; + try { + if (null != fileStatus && fileStatus.isDirectory()) { + Path path = fileStatus.getPath(); + listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path); + } else { + return null; + } + } catch (IOException ex) { + LOGGER.error("Exception occured" + ex.getMessage()); + return new CarbonFile[0]; + } + return getFiles(listStatus); + } + + @Override + public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) { + CarbonFile[] files = listFiles(); + if (files != null && files.length >= 1) { + List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length); + for (int i = 0; i < files.length; i++) { + if (fileFilter.accept(files[i])) { + fileList.add(files[i]); + } + } + if (fileList.size() >= 1) { + return fileList.toArray(new CarbonFile[fileList.size()]); + } else { + return new CarbonFile[0]; + } + } + return files; + } + + @Override public CarbonFile getParentFile() { + Path parent = fileStatus.getPath().getParent(); + return null == parent ? null : new ViewFSCarbonFile(parent); + } + + @Override + public boolean renameForce(String changetoName) { + FileSystem fs; + try { + fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration()); + if (fs instanceof ViewFileSystem) { + fs.delete(new Path(changetoName), true); + fs.rename(fileStatus.getPath(), new Path(changetoName)); + return true; + } else { + return false; + } + } catch (IOException e) { + LOGGER.error("Exception occured" + e.getMessage()); + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/impl/CompressedDataMeasureDataWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/CompressedDataMeasureDataWrapper.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/CompressedDataMeasureDataWrapper.java new file mode 100644 index 0000000..fe754b1 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/CompressedDataMeasureDataWrapper.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.datastore.impl; + +import org.apache.carbondata.core.datastore.MeasureDataWrapper; +import org.apache.carbondata.core.datastore.dataholder.CarbonReadDataHolder; + +public class CompressedDataMeasureDataWrapper implements MeasureDataWrapper { + + private final CarbonReadDataHolder[] values; + + public CompressedDataMeasureDataWrapper(final CarbonReadDataHolder[] values) { + this.values = values; + } + + @Override public CarbonReadDataHolder[] getValues() { + return values; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileHolderImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileHolderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileHolderImpl.java new file mode 100644 index 0000000..d97cdf3 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileHolderImpl.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.datastore.impl; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.FileHolder; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + + +public class DFSFileHolderImpl implements FileHolder { + /** + * cache to hold filename and its stream + */ + private Map<String, FSDataInputStream> fileNameAndStreamCache; + + public DFSFileHolderImpl() { + this.fileNameAndStreamCache = + new HashMap<String, FSDataInputStream>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + } + + @Override public byte[] readByteArray(String filePath, long offset, int length) + throws IOException { + FSDataInputStream fileChannel = updateCache(filePath); + return read(fileChannel, length, offset); + } + + /** + * This method will be used to check whether stream is already present in + * cache or not for filepath if not present then create it and then add to + * cache, other wise get from cache + * + * @param filePath fully qualified file path + * @return channel + */ + private FSDataInputStream updateCache(String filePath) throws IOException { + FSDataInputStream fileChannel = fileNameAndStreamCache.get(filePath); + if (null == fileChannel) { + Path pt = new Path(filePath); + FileSystem fs = FileSystem.get(FileFactory.getConfiguration()); + fileChannel = fs.open(pt); + fileNameAndStreamCache.put(filePath, fileChannel); + } + return fileChannel; + } + + /** + * This method will be used to read from file based on number of bytes to be read and positon + * + * @param channel file channel + * @param size number of bytes + * @param offset position + * @return byte buffer + */ + private byte[] read(FSDataInputStream channel, int size, long offset) throws IOException { + byte[] byteBffer = new byte[size]; + channel.seek(offset); + channel.readFully(byteBffer); + return byteBffer; + } + + /** + * This method will be used to read from file based on number of bytes to be read and positon + * + * @param channel file channel + * @param size number of bytes + * @return byte buffer + */ + private byte[] read(FSDataInputStream channel, int size) throws IOException { + byte[] byteBffer = new byte[size]; + channel.readFully(byteBffer); + return byteBffer; + } + + @Override public int readInt(String filePath, long offset) throws IOException { + FSDataInputStream fileChannel = updateCache(filePath); + fileChannel.seek(offset); + return fileChannel.readInt(); + } + + @Override public long readDouble(String filePath, long offset) throws IOException { + FSDataInputStream fileChannel = updateCache(filePath); + fileChannel.seek(offset); + return fileChannel.readLong(); + } + + @Override public void finish() throws IOException { + for (Entry<String, FSDataInputStream> entry : fileNameAndStreamCache.entrySet()) { + FSDataInputStream channel = entry.getValue(); + if (null != channel) { + channel.close(); + } + } + } + + @Override public byte[] readByteArray(String filePath, int length) throws IOException { + FSDataInputStream fileChannel = updateCache(filePath); + return read(fileChannel, length); + } + + @Override public long readLong(String filePath, long offset) throws IOException { + FSDataInputStream fileChannel = updateCache(filePath); + fileChannel.seek(offset); + return fileChannel.readLong(); + } + + @Override public int readInt(String filePath) throws IOException { + FSDataInputStream fileChannel = updateCache(filePath); + return fileChannel.readInt(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java new file mode 100644 index 0000000..db108ed --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java @@ -0,0 +1,485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.datastore.impl; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.zip.GZIPInputStream; + +import org.apache.carbondata.core.datastore.FileHolder; +import org.apache.carbondata.core.datastore.filesystem.AlluxioCarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.HDFSCarbonFile; +import org.apache.carbondata.core.datastore.filesystem.LocalCarbonFile; +import org.apache.carbondata.core.datastore.filesystem.ViewFSCarbonFile; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.io.compress.GzipCodec; + +public final class FileFactory { + private static Configuration configuration = null; + + static { + configuration = new Configuration(); + configuration.addResource(new Path("../core-default.xml")); + } + + private FileFactory() { + + } + + public static Configuration getConfiguration() { + return configuration; + } + + public static FileHolder getFileHolder(FileType fileType) { + switch (fileType) { + case LOCAL: + return new FileHolderImpl(); + case HDFS: + case ALLUXIO: + case VIEWFS: + return new DFSFileHolderImpl(); + default: + return new FileHolderImpl(); + } + } + + public static FileType getFileType(String path) { + if (path.startsWith(CarbonUtil.HDFS_PREFIX)) { + return FileType.HDFS; + } + else if (path.startsWith(CarbonUtil.ALLUXIO_PREFIX)) { + return FileType.ALLUXIO; + } + else if (path.startsWith(CarbonUtil.VIEWFS_PREFIX)) { + return FileType.VIEWFS; + } + return FileType.LOCAL; + } + + public static CarbonFile getCarbonFile(String path, FileType fileType) { + switch (fileType) { + case LOCAL: + return new LocalCarbonFile(path); + case HDFS: + return new HDFSCarbonFile(path); + case ALLUXIO: + return new AlluxioCarbonFile(path); + case VIEWFS: + return new ViewFSCarbonFile(path); + default: + return new LocalCarbonFile(path); + } + } + + public static DataInputStream getDataInputStream(String path, FileType fileType) + throws IOException { + return getDataInputStream(path, fileType, -1); + } + + public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize) + throws IOException { + path = path.replace("\\", "/"); + boolean gzip = path.endsWith(".gz"); + boolean bzip2 = path.endsWith(".bz2"); + InputStream stream; + switch (fileType) { + case LOCAL: + if (gzip) { + stream = new GZIPInputStream(new FileInputStream(path)); + } else if (bzip2) { + stream = new BZip2CompressorInputStream(new FileInputStream(path)); + } else { + stream = new FileInputStream(path); + } + break; + case HDFS: + case ALLUXIO: + case VIEWFS: + Path pt = new Path(path); + FileSystem fs = pt.getFileSystem(configuration); + if (bufferSize == -1) { + stream = fs.open(pt); + } else { + stream = fs.open(pt, bufferSize); + } + if (gzip) { + GzipCodec codec = new GzipCodec(); + stream = codec.createInputStream(stream); + } else if (bzip2) { + BZip2Codec codec = new BZip2Codec(); + stream = codec.createInputStream(stream); + } + break; + default: + throw new UnsupportedOperationException("unsupported file system"); + } + return new DataInputStream(new BufferedInputStream(stream)); + } + + /** + * return the datainputStream which is seek to the offset of file + * + * @param path + * @param fileType + * @param bufferSize + * @param offset + * @return DataInputStream + * @throws IOException + */ + public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize, + long offset) throws IOException { + path = path.replace("\\", "/"); + switch (fileType) { + case HDFS: + case ALLUXIO: + case VIEWFS: + Path pt = new Path(path); + FileSystem fs = pt.getFileSystem(configuration); + FSDataInputStream stream = fs.open(pt, bufferSize); + stream.seek(offset); + return new DataInputStream(new BufferedInputStream(stream)); + default: + FileInputStream fis = new FileInputStream(path); + long actualSkipSize = 0; + long skipSize = offset; + while (actualSkipSize != offset) { + actualSkipSize += fis.skip(skipSize); + skipSize = skipSize - actualSkipSize; + } + return new DataInputStream(new BufferedInputStream(fis)); + } + } + + public static DataOutputStream getDataOutputStream(String path, FileType fileType) + throws IOException { + path = path.replace("\\", "/"); + switch (fileType) { + case LOCAL: + return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path))); + case HDFS: + case ALLUXIO: + case VIEWFS: + Path pt = new Path(path); + FileSystem fs = pt.getFileSystem(configuration); + FSDataOutputStream stream = fs.create(pt, true); + return stream; + default: + return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path))); + } + } + + public static DataOutputStream getDataOutputStream(String path, FileType fileType, int bufferSize, + boolean append) throws IOException { + path = path.replace("\\", "/"); + switch (fileType) { + case LOCAL: + return new DataOutputStream( + new BufferedOutputStream(new FileOutputStream(path, append), bufferSize)); + case HDFS: + case ALLUXIO: + case VIEWFS: + Path pt = new Path(path); + FileSystem fs = pt.getFileSystem(configuration); + FSDataOutputStream stream = null; + if (append) { + // append to a file only if file already exists else file not found + // exception will be thrown by hdfs + if (CarbonUtil.isFileExists(path)) { + stream = fs.append(pt, bufferSize); + } else { + stream = fs.create(pt, true, bufferSize); + } + } else { + stream = fs.create(pt, true, bufferSize); + } + return stream; + default: + return new DataOutputStream( + new BufferedOutputStream(new FileOutputStream(path), bufferSize)); + } + } + + public static DataOutputStream getDataOutputStream(String path, FileType fileType, int bufferSize, + long blockSize) throws IOException { + path = path.replace("\\", "/"); + switch (fileType) { + case LOCAL: + return new DataOutputStream( + new BufferedOutputStream(new FileOutputStream(path), bufferSize)); + case HDFS: + case ALLUXIO: + case VIEWFS: + Path pt = new Path(path); + FileSystem fs = pt.getFileSystem(configuration); + FSDataOutputStream stream = + fs.create(pt, true, bufferSize, fs.getDefaultReplication(pt), blockSize); + return stream; + default: + return new DataOutputStream( + new BufferedOutputStream(new FileOutputStream(path), bufferSize)); + } + } + + /** + * This method checks the given path exists or not and also is it file or + * not if the performFileCheck is true + * + * @param filePath - Path + * @param fileType - FileType Local/HDFS + * @param performFileCheck - Provide false for folders, true for files and + */ + public static boolean isFileExist(String filePath, FileType fileType, boolean performFileCheck) + throws IOException { + filePath = filePath.replace("\\", "/"); + switch (fileType) { + case HDFS: + case ALLUXIO: + case VIEWFS: + Path path = new Path(filePath); + FileSystem fs = path.getFileSystem(configuration); + if (performFileCheck) { + return fs.exists(path) && fs.isFile(path); + } else { + return fs.exists(path); + } + + case LOCAL: + default: + File defaultFile = new File(filePath); + + if (performFileCheck) { + return defaultFile.exists() && defaultFile.isFile(); + } else { + return defaultFile.exists(); + } + } + } + + /** + * This method checks the given path exists or not and also is it file or + * not if the performFileCheck is true + * + * @param filePath - Path + * @param fileType - FileType Local/HDFS + */ + public static boolean isFileExist(String filePath, FileType fileType) throws IOException { + filePath = filePath.replace("\\", "/"); + switch (fileType) { + case HDFS: + case ALLUXIO: + case VIEWFS: + Path path = new Path(filePath); + FileSystem fs = path.getFileSystem(configuration); + return fs.exists(path); + + case LOCAL: + default: + File defaultFile = new File(filePath); + return defaultFile.exists(); + } + } + + public static boolean createNewFile(String filePath, FileType fileType) throws IOException { + filePath = filePath.replace("\\", "/"); + switch (fileType) { + case HDFS: + case ALLUXIO: + case VIEWFS: + Path path = new Path(filePath); + FileSystem fs = path.getFileSystem(configuration); + return fs.createNewFile(path); + + case LOCAL: + default: + File file = new File(filePath); + return file.createNewFile(); + } + } + + public static boolean deleteFile(String filePath, FileType fileType) throws IOException { + filePath = filePath.replace("\\", "/"); + switch (fileType) { + case HDFS: + case ALLUXIO: + case VIEWFS: + Path path = new Path(filePath); + FileSystem fs = path.getFileSystem(configuration); + return fs.delete(path, true); + + case LOCAL: + default: + File file = new File(filePath); + return deleteAllFilesOfDir(file); + } + } + + public static boolean deleteAllFilesOfDir(File path) { + if (!path.exists()) { + return true; + } + if (path.isFile()) { + return path.delete(); + } + File[] files = path.listFiles(); + for (int i = 0; i < files.length; i++) { + deleteAllFilesOfDir(files[i]); + } + return path.delete(); + } + + public static boolean mkdirs(String filePath, FileType fileType) throws IOException { + filePath = filePath.replace("\\", "/"); + switch (fileType) { + case HDFS: + case ALLUXIO: + case VIEWFS: + Path path = new Path(filePath); + FileSystem fs = path.getFileSystem(configuration); + return fs.mkdirs(path); + case LOCAL: + default: + File file = new File(filePath); + return file.mkdirs(); + } + } + + /** + * for getting the dataoutput stream using the hdfs filesystem append API. + * + * @param path + * @param fileType + * @return + * @throws IOException + */ + public static DataOutputStream getDataOutputStreamUsingAppend(String path, FileType fileType) + throws IOException { + path = path.replace("\\", "/"); + switch (fileType) { + case LOCAL: + return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path, true))); + case HDFS: + case ALLUXIO: + case VIEWFS: + Path pt = new Path(path); + FileSystem fs = pt.getFileSystem(configuration); + FSDataOutputStream stream = fs.append(pt); + return stream; + default: + return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path))); + } + } + + /** + * for creating a new Lock file and if it is successfully created + * then in case of abrupt shutdown then the stream to that file will be closed. + * + * @param filePath + * @param fileType + * @return + * @throws IOException + */ + public static boolean createNewLockFile(String filePath, FileType fileType) throws IOException { + filePath = filePath.replace("\\", "/"); + switch (fileType) { + case HDFS: + case ALLUXIO: + case VIEWFS: + Path path = new Path(filePath); + FileSystem fs = path.getFileSystem(configuration); + if (fs.createNewFile(path)) { + fs.deleteOnExit(path); + return true; + } + return false; + case LOCAL: + default: + File file = new File(filePath); + return file.createNewFile(); + } + } + + public enum FileType { + LOCAL, HDFS, ALLUXIO, VIEWFS + } + + /** + * below method will be used to update the file path + * for local type + * it removes the file:/ from the path + * + * @param filePath + * @return updated file path without url for local + */ + public static String getUpdatedFilePath(String filePath) { + FileType fileType = getFileType(filePath); + switch (fileType) { + case HDFS: + case ALLUXIO: + case VIEWFS: + return filePath; + case LOCAL: + default: + Path pathWithoutSchemeAndAuthority = + Path.getPathWithoutSchemeAndAuthority(new Path(filePath)); + return pathWithoutSchemeAndAuthority.toString(); + } + } + + /** + * It computes size of directory + * + * @param filePath + * @return size in bytes + * @throws IOException + */ + public static long getDirectorySize(String filePath) throws IOException { + FileType fileType = getFileType(filePath); + switch (fileType) { + case HDFS: + case ALLUXIO: + case VIEWFS: + Path path = new Path(filePath); + FileSystem fs = path.getFileSystem(configuration); + return fs.getContentSummary(path).getLength(); + case LOCAL: + default: + File file = new File(filePath); + return FileUtils.sizeOfDirectory(file); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileHolderImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileHolderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileHolderImpl.java new file mode 100644 index 0000000..48928c6 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileHolderImpl.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.datastore.impl; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.FileHolder; + +public class FileHolderImpl implements FileHolder { + /** + * cache to hold filename and its stream + */ + private Map<String, FileChannel> fileNameAndStreamCache; + + /** + * FileHolderImpl Constructor + * It will create the cache + */ + public FileHolderImpl() { + this.fileNameAndStreamCache = + new HashMap<String, FileChannel>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + } + + public FileHolderImpl(int capacity) { + this.fileNameAndStreamCache = new HashMap<String, FileChannel>(capacity); + } + + /** + * This method will be used to read the byte array from file based on offset + * and length(number of bytes) need to read + * + * @param filePath fully qualified file path + * @param offset reading start position, + * @param length number of bytes to be read + * @return read byte array + */ + @Override public byte[] readByteArray(String filePath, long offset, int length) + throws IOException { + FileChannel fileChannel = updateCache(filePath); + ByteBuffer byteBffer = read(fileChannel, length, offset); + return byteBffer.array(); + } + + /** + * This method will be used to close all the streams currently present in the cache + */ + @Override public void finish() throws IOException { + for (Entry<String, FileChannel> entry : fileNameAndStreamCache.entrySet()) { + FileChannel channel = entry.getValue(); + if (null != channel) { + channel.close(); + } + } + } + + /** + * This method will be used to read int from file from postion(offset), here + * length will be always 4 bacause int byte size if 4 + * + * @param filePath fully qualified file path + * @param offset reading start position, + * @return read int + */ + @Override public int readInt(String filePath, long offset) throws IOException { + FileChannel fileChannel = updateCache(filePath); + ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.INT_SIZE_IN_BYTE, offset); + return byteBffer.getInt(); + } + + /** + * This method will be used to read int from file from postion(offset), here + * length will be always 4 bacause int byte size if 4 + * + * @param filePath fully qualified file path + * @return read int + */ + @Override public int readInt(String filePath) throws IOException { + FileChannel fileChannel = updateCache(filePath); + ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.INT_SIZE_IN_BYTE); + return byteBffer.getInt(); + } + + /** + * This method will be used to read int from file from postion(offset), here + * length will be always 4 bacause int byte size if 4 + * + * @param filePath fully qualified file path + * @param offset reading start position, + * @return read int + */ + @Override public long readDouble(String filePath, long offset) throws IOException { + FileChannel fileChannel = updateCache(filePath); + ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.LONG_SIZE_IN_BYTE, offset); + return byteBffer.getLong(); + } + + /** + * This method will be used to check whether stream is already present in + * cache or not for filepath if not present then create it and then add to + * cache, other wise get from cache + * + * @param filePath fully qualified file path + * @return channel + */ + private FileChannel updateCache(String filePath) throws FileNotFoundException { + FileChannel fileChannel = fileNameAndStreamCache.get(filePath); + if (null == fileChannel) { + FileInputStream stream = new FileInputStream(filePath); + fileChannel = stream.getChannel(); + fileNameAndStreamCache.put(filePath, fileChannel); + } + return fileChannel; + } + + /** + * This method will be used to read from file based on number of bytes to be read and positon + * + * @param channel file channel + * @param size number of bytes + * @param offset position + * @return byte buffer + */ + private ByteBuffer read(FileChannel channel, int size, long offset) throws IOException { + ByteBuffer byteBffer = ByteBuffer.allocate(size); + channel.position(offset); + channel.read(byteBffer); + byteBffer.rewind(); + return byteBffer; + } + + /** + * This method will be used to read from file based on number of bytes to be read and positon + * + * @param channel file channel + * @param size number of bytes + * @return byte buffer + */ + private ByteBuffer read(FileChannel channel, int size) throws IOException { + ByteBuffer byteBffer = ByteBuffer.allocate(size); + channel.read(byteBffer); + byteBffer.rewind(); + return byteBffer; + } + + + /** + * This method will be used to read the byte array from file based on length(number of bytes) + * + * @param filePath fully qualified file path + * @param length number of bytes to be read + * @return read byte array + */ + @Override public byte[] readByteArray(String filePath, int length) throws IOException { + FileChannel fileChannel = updateCache(filePath); + ByteBuffer byteBffer = read(fileChannel, length); + return byteBffer.array(); + } + + /** + * This method will be used to read long from file from postion(offset), here + * length will be always 8 bacause int byte size is 8 + * + * @param filePath fully qualified file path + * @param offset reading start position, + * @return read long + */ + @Override public long readLong(String filePath, long offset) throws IOException { + FileChannel fileChannel = updateCache(filePath); + ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.LONG_SIZE_IN_BYTE, offset); + return byteBffer.getLong(); + } + +}