http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneLong.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneLong.java b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneLong.java deleted file mode 100644 index c22464d..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneLong.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.core.datastorage.compression.none; - -import java.math.BigDecimal; -import java.nio.ByteBuffer; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.datastorage.compression.Compressor; -import org.apache.carbondata.core.datastorage.compression.CompressorFactory; -import org.apache.carbondata.core.datastorage.compression.ValueCompressionHolder; -import org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory; -import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore; -import org.apache.carbondata.core.util.ValueCompressionUtil; -import org.apache.carbondata.core.util.ValueCompressionUtil.DataType; - -public class CompressionNoneLong extends ValueCompressionHolder<long[]> { - /** - * Attribute for Carbon LOGGER - */ - private static final LogService LOGGER = - LogServiceFactory.getLogService(CompressionNoneLong.class.getName()); - /** - * longCompressor. - */ - private static Compressor compressor = CompressorFactory.getInstance().getCompressor(); - /** - * value. - */ - protected long[] value; - - private DataType actualDataType; - - private MeasureDataChunkStore<long[]> measureChunkStore; - - public CompressionNoneLong(DataType actualDataType) { - this.actualDataType = actualDataType; - } - - @Override public void setValue(long[] value) { this.value = value; } - - @Override public long[] getValue() { return this.value; } - - @Override public void compress() { - compressedValue = super.compress(compressor, DataType.DATA_LONG, value); - } - - @Override - public void uncompress(DataType dataType, byte[] data, int offset, int length, - int decimalPlaces, Object maxValueObject) { - super.unCompress(compressor, dataType, data, offset, length); - setUncompressedValues(value); - } - - @Override public void setValueInBytes(byte[] byteValue) { - ByteBuffer buffer = ByteBuffer.wrap(byteValue); - this.value = ValueCompressionUtil.convertToLongArray(buffer, byteValue.length); - } - - @Override public long getLongValue(int index) { - return measureChunkStore.getLong(index); - } - - @Override public double getDoubleValue(int index) { - return measureChunkStore.getLong(index); - } - - @Override public BigDecimal getBigDecimalValue(int index) { - throw new UnsupportedOperationException("Get big decimal is not supported"); - } - - private void setUncompressedValues(long[] data) { - this.measureChunkStore = - MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_LONG, data.length); - this.measureChunkStore.putData(data); - } - - @Override public void freeMemory() { - this.measureChunkStore.freeMemory(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneShort.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneShort.java b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneShort.java deleted file mode 100644 index f0b1c99..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/compression/none/CompressionNoneShort.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.core.datastorage.compression.none; - -import java.math.BigDecimal; -import java.nio.ByteBuffer; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.datastorage.compression.Compressor; -import org.apache.carbondata.core.datastorage.compression.CompressorFactory; -import org.apache.carbondata.core.datastorage.compression.ValueCompressionHolder; -import org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory; -import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore; -import org.apache.carbondata.core.util.ValueCompressionUtil; -import org.apache.carbondata.core.util.ValueCompressionUtil.DataType; - -public class CompressionNoneShort extends ValueCompressionHolder<short[]> { - /** - * Attribute for Carbon LOGGER - */ - private static final LogService LOGGER = - LogServiceFactory.getLogService(CompressionNoneShort.class.getName()); - - /** - * shortCompressor. - */ - private static Compressor compressor = CompressorFactory.getInstance().getCompressor(); - - /** - * value. - */ - private short[] shortValue; - - private MeasureDataChunkStore<short[]> measureChunkStore; - - private DataType actualDataType; - - public CompressionNoneShort(DataType actualDataType) { - this.actualDataType = actualDataType; - } - - @Override public void setValue(short[] shortValue) { - this.shortValue = shortValue; - } - - @Override public short[] getValue() { return this.shortValue; } - - @Override - public void uncompress(DataType dataType, byte[] data, int offset, int length, - int decimalPlaces, Object maxValueObject) { - super.unCompress(compressor, dataType, data, offset, length); - setUncompressedValues(shortValue); - } - - @Override public void compress() { - compressedValue = super.compress(compressor, DataType.DATA_SHORT, shortValue); - } - - @Override public void setValueInBytes(byte[] value) { - ByteBuffer buffer = ByteBuffer.wrap(value); - shortValue = ValueCompressionUtil.convertToShortArray(buffer, value.length); - } - - @Override public long getLongValue(int index) { - return measureChunkStore.getShort(index); - } - - @Override public double getDoubleValue(int index) { - return measureChunkStore.getShort(index); - } - - @Override public BigDecimal getBigDecimalValue(int index) { - throw new UnsupportedOperationException( - "Big decimal is not defined for CompressionNonShort"); - } - - private void setUncompressedValues(short[] data) { - this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE - .getMeasureDataChunkStore(DataType.DATA_SHORT, data.length); - this.measureChunkStore.putData(data); - } - - @Override public void freeMemory() { - this.measureChunkStore.freeMemory(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastorage/compression/type/CompressionBigDecimal.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/compression/type/CompressionBigDecimal.java b/core/src/main/java/org/apache/carbondata/core/datastorage/compression/type/CompressionBigDecimal.java deleted file mode 100644 index 8233e90..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/compression/type/CompressionBigDecimal.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.carbondata.core.datastorage.compression.type; - - -import java.math.BigDecimal; -import java.nio.ByteBuffer; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastorage.compression.ValueCompressionHolder; -import org.apache.carbondata.core.util.BigDecimalCompressionFinder; -import org.apache.carbondata.core.util.ValueCompressionUtil.DataType; - -/** - * Big decimal compression - */ -public class CompressionBigDecimal<T> extends ValueCompressionHolder<T> { - - private static final LogService LOGGER = - LogServiceFactory.getLogService(CompressionBigDecimal.class.getName()); - - private BigDecimalCompressionFinder compressionFinder; - - /** - * leftPart before decimal - */ - private ValueCompressionHolder leftPart; - - /** - * rightPart after decimal - */ - private ValueCompressionHolder rightPart; - - private double divisionFactor; - - private boolean isDecimalPlacesNotZero; - - public CompressionBigDecimal(BigDecimalCompressionFinder compressionFinder, - ValueCompressionHolder leftPart, ValueCompressionHolder rightPart) { - this.compressionFinder = compressionFinder; - this.leftPart = leftPart; - this.rightPart = rightPart; - } - - @Override public void setValue(T value) { - Object[] values = (Object[]) value; - leftPart.setValue(values[0]); - rightPart.setValue(values[1]); - } - - @Override - public void uncompress(DataType dataType, byte[] data, int offset, int length, - int decimalPlaces, Object maxValueObject) { - if (decimalPlaces > 0) { - this.isDecimalPlacesNotZero = true; - } - this.divisionFactor = Math.pow(10, decimalPlaces); - - ByteBuffer buffer = ByteBuffer.wrap(data, offset, length); - int leftPathLength = buffer.getInt(); - int rightPartLength = length - leftPathLength - CarbonCommonConstants.INT_SIZE_IN_BYTE; - Long[] maxValue = (Long[]) maxValueObject; - leftPart.uncompress(compressionFinder.getLeftConvertedDataType(), data, - offset + CarbonCommonConstants.INT_SIZE_IN_BYTE, leftPathLength, decimalPlaces, - maxValue[0]); - rightPart.uncompress(compressionFinder.getRightConvertedDataType(), data, - offset + CarbonCommonConstants.INT_SIZE_IN_BYTE + leftPathLength, rightPartLength, - decimalPlaces, maxValue[1]); - } - - @Override public long getLongValue(int index) { - throw new UnsupportedOperationException( - "Long is not defined for CompressionBigDecimal"); - } - - @Override public double getDoubleValue(int index) { - throw new UnsupportedOperationException( - "Double is not defined for CompressionBigDecimal"); - } - - @Override public BigDecimal getBigDecimalValue(int index) { - long leftValue = leftPart.getLongValue(index); - long rightValue = 0; - if (isDecimalPlacesNotZero) { - rightValue = rightPart.getLongValue(index); - } - String decimalPart = Double.toString(rightValue / this.divisionFactor); - String bigdStr = Long.toString(leftValue) + CarbonCommonConstants.POINT + decimalPart - .substring(decimalPart.indexOf(".") + 1, decimalPart.length()); - return new BigDecimal(bigdStr); - } - - @Override public T getValue() { - Object[] values = new Object[2]; - values[0] = leftPart; - values[1] = rightPart; - return (T) values; - } - - @Override public void setValueInBytes(byte[] value) { - LOGGER.error("setValueInBytes() is not defined for CompressionBigDecimal"); - } - - @Override public void compress() { - leftPart.compress(); - rightPart.compress(); - } - - @Override - public byte[] getCompressedData() { - byte[] leftdata = leftPart.getCompressedData(); - byte[] rightdata = rightPart.getCompressedData(); - ByteBuffer byteBuffer = ByteBuffer - .allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE + leftdata.length - + rightdata.length); - byteBuffer.putInt(leftdata.length); - byteBuffer.put(leftdata); - byteBuffer.put(rightdata); - byteBuffer.flip(); - return byteBuffer.array(); - } - - @Override public void freeMemory() { - leftPart.freeMemory(); - rightPart.freeMemory(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastorage/dataholder/CarbonReadDataHolder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/dataholder/CarbonReadDataHolder.java b/core/src/main/java/org/apache/carbondata/core/datastorage/dataholder/CarbonReadDataHolder.java deleted file mode 100644 index 29ff254..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/dataholder/CarbonReadDataHolder.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.core.datastorage.dataholder; - -import java.math.BigDecimal; - -import org.apache.carbondata.core.datastorage.compression.ValueCompressionHolder; - -// This class is used with Uncompressor to hold the decompressed column chunk in memory -public class CarbonReadDataHolder { - - private ValueCompressionHolder unCompressValue; - - public CarbonReadDataHolder(ValueCompressionHolder unCompressValue) { - this.unCompressValue = unCompressValue; - } - - public long getReadableLongValueByIndex(int index) { - return this.unCompressValue.getLongValue(index); - } - - public BigDecimal getReadableBigDecimalValueByIndex(int index) { - return this.unCompressValue.getBigDecimalValue(index); - } - - public double getReadableDoubleValueByIndex(int index) { - return this.unCompressValue.getDoubleValue(index); - } - - public void freeMemory() { - unCompressValue.freeMemory(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastorage/dataholder/CarbonWriteDataHolder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/dataholder/CarbonWriteDataHolder.java b/core/src/main/java/org/apache/carbondata/core/datastorage/dataholder/CarbonWriteDataHolder.java deleted file mode 100644 index e6d9123..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/dataholder/CarbonWriteDataHolder.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.core.datastorage.dataholder; - -public class CarbonWriteDataHolder { - /** - * doubleValues - */ - private double[] doubleValues; - - /** - * longValues - */ - private long[] longValues; - - /** - * bigDecimal left part - */ - private long[] bigDecimalLeftValues; - - /** - * bigDecimal right part - */ - private long[] bigDecimalRightValues; - /** - * byteValues - */ - private byte[][] byteValues; - - /** - * byteValues for no dictionary and non kettle flow. - */ - private byte[][][] byteValuesForNonDictionary; - - /** - * byteValues - */ - private byte[][][] columnByteValues; - - /** - * size - */ - private int size; - - /** - * totalSize - */ - private int totalSize; - - /** - * Method to initialise double array - * - * @param size - */ - public void initialiseDoubleValues(int size) { - if (size < 1) { - throw new IllegalArgumentException("Invalid array size"); - } - doubleValues = new double[size]; - } - - public void reset() { - size = 0; - totalSize = 0; - } - - /** - * Method to initialise double array - * TODO Remove after kettle flow got removed. - * - * @param size - */ - public void initialiseByteArrayValues(int size) { - if (size < 1) { - throw new IllegalArgumentException("Invalid array size"); - } - - byteValues = new byte[size][]; - columnByteValues = new byte[size][][]; - } - - /** - * Method to initialise byte array - * - * @param size - */ - public void initialiseByteArrayValuesWithOutKettle(int size) { - if (size < 1) { - throw new IllegalArgumentException("Invalid array size"); - } - - byteValues = new byte[size][]; - } - - public void initialiseByteArrayValuesForNonDictionary(int size) { - if (size < 1) { - throw new IllegalArgumentException("Invalid array size"); - } - - byteValuesForNonDictionary = new byte[size][][]; - } - - /** - * Method to initialise long array - * - * @param size - */ - public void initialiseLongValues(int size) { - if (size < 1) { - throw new IllegalArgumentException("Invalid array size"); - } - longValues = new long[size]; - } - - public void initialiseBigDecimalValues(int size) { - if (size < 1) { - throw new IllegalArgumentException("Invalid array size"); - } - bigDecimalLeftValues = new long[size]; - bigDecimalRightValues = new long[size]; - } - - /** - * set double value by index - * - * @param index - * @param value - */ - public void setWritableDoubleValueByIndex(int index, Object value) { - doubleValues[index] = (Double) value; - size++; - } - - /** - * set double value by index - * - * @param index - * @param value - */ - public void setWritableLongValueByIndex(int index, Object value) { - longValues[index] = (Long) value; - size++; - } - - /** - * set bigdecimal value by index - * - * @param index - * @param value - */ - public void setWritableBigDecimalValueByIndex(int index, long[] value) { - bigDecimalLeftValues[index] = value[0]; - bigDecimalRightValues[index] = value[1]; - size++; - } - /** - * set byte array value by index - * - * @param index - * @param value - */ - public void setWritableByteArrayValueByIndex(int index, byte[] value) { - byteValues[index] = value; - size++; - if (null != value) totalSize += value.length; - } - - public void setWritableNonDictByteArrayValueByIndex(int index, byte[][] value) { - byteValuesForNonDictionary[index] = value; - size++; - if (null != value) totalSize += value.length; - } - - /** - * set byte array value by index - */ - public void setWritableByteArrayValueByIndex(int index, int mdKeyIndex, Object[] columnData) { - int l = 0; - columnByteValues[index] = new byte[columnData.length - (mdKeyIndex + 1)][]; - for (int i = mdKeyIndex + 1; i < columnData.length; i++) { - columnByteValues[index][l++] = (byte[]) columnData[i]; - } - } - - /** - * Get Writable Double Values - */ - public double[] getWritableDoubleValues() { - if (size < doubleValues.length) { - double[] temp = new double[size]; - System.arraycopy(doubleValues, 0, temp, 0, size); - doubleValues = temp; - } - return doubleValues; - } - - /** - * Get writable byte array values - */ - public byte[] getWritableByteArrayValues() { - byte[] temp = new byte[totalSize]; - int startIndexToCopy = 0; - for (int i = 0; i < size; i++) { - System.arraycopy(byteValues[i], 0, temp, startIndexToCopy, byteValues[i].length); - startIndexToCopy += byteValues[i].length; - } - return temp; - } - - public byte[][] getByteArrayValues() { - if (size < byteValues.length) { - byte[][] temp = new byte[size][]; - System.arraycopy(byteValues, 0, temp, 0, size); - byteValues = temp; - } - return byteValues; - } - - public byte[][][] getNonDictByteArrayValues() { - if (size < byteValuesForNonDictionary.length) { - byte[][][] temp = new byte[size][][]; - System.arraycopy(byteValuesForNonDictionary, 0, temp, 0, size); - byteValuesForNonDictionary = temp; - } - return byteValuesForNonDictionary; - } - - /** - * Get Writable Double Values - * - * @return - */ - public long[] getWritableLongValues() { - if (size < longValues.length) { - long[] temp = new long[size]; - System.arraycopy(longValues, 0, temp, 0, size); - longValues = temp; - } - return longValues; - } - - /** - * Get Writable bigdecimal Values - * - * @return - */ - public long[][] getWritableBigDecimalValues() { - long[][] bigDecimalValues = new long[2][]; - if (size < bigDecimalLeftValues.length) { - long[] temp = new long[size]; - System.arraycopy(bigDecimalLeftValues, 0, temp, 0, size); - bigDecimalLeftValues = temp; - } - if (size < bigDecimalRightValues.length) { - long[] temp = new long[size]; - System.arraycopy(bigDecimalRightValues, 0, temp, 0, size); - bigDecimalRightValues = temp; - } - bigDecimalValues[0]= bigDecimalLeftValues; - bigDecimalValues[1] = bigDecimalRightValues; - return bigDecimalValues; - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/AbstractDFSCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/AbstractDFSCarbonFile.java deleted file mode 100644 index 8963702..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/AbstractDFSCarbonFile.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.core.datastorage.filesystem; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastorage.impl.FileFactory; -import org.apache.carbondata.core.util.CarbonUtil; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -public abstract class AbstractDFSCarbonFile implements CarbonFile { - /** - * LOGGER - */ - private static final LogService LOGGER = - LogServiceFactory.getLogService(AbstractDFSCarbonFile.class.getName()); - protected FileStatus fileStatus; - protected FileSystem fs; - - public AbstractDFSCarbonFile(String filePath) { - filePath = filePath.replace("\\", "/"); - Path path = new Path(filePath); - try { - fs = path.getFileSystem(FileFactory.getConfiguration()); - fileStatus = fs.getFileStatus(path); - } catch (IOException e) { - LOGGER.error("Exception occured:" + e.getMessage()); - } - } - - public AbstractDFSCarbonFile(Path path) { - try { - fs = path.getFileSystem(FileFactory.getConfiguration()); - fileStatus = fs.getFileStatus(path); - } catch (IOException e) { - LOGGER.error("Exception occured:" + e.getMessage()); - } - } - - public AbstractDFSCarbonFile(FileStatus fileStatus) { - this.fileStatus = fileStatus; - } - - @Override public boolean createNewFile() { - Path path = fileStatus.getPath(); - try { - return fs.createNewFile(path); - } catch (IOException e) { - return false; - } - } - - @Override public String getAbsolutePath() { - return fileStatus.getPath().toString(); - } - - @Override public String getName() { - return fileStatus.getPath().getName(); - } - - @Override public boolean isDirectory() { - return fileStatus.isDirectory(); - } - - @Override public boolean exists() { - try { - if (null != fileStatus) { - fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration()); - return fs.exists(fileStatus.getPath()); - } - } catch (IOException e) { - LOGGER.error("Exception occured:" + e.getMessage()); - } - return false; - } - - @Override public String getCanonicalPath() { - return getAbsolutePath(); - } - - @Override public String getPath() { - return getAbsolutePath(); - } - - @Override public long getSize() { - return fileStatus.getLen(); - } - - public boolean renameTo(String changetoName) { - FileSystem fs; - try { - fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration()); - return fs.rename(fileStatus.getPath(), new Path(changetoName)); - } catch (IOException e) { - LOGGER.error("Exception occured:" + e.getMessage()); - return false; - } - } - - public boolean delete() { - FileSystem fs; - try { - fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration()); - return fs.delete(fileStatus.getPath(), true); - } catch (IOException e) { - LOGGER.error("Exception occured:" + e.getMessage()); - return false; - } - } - - @Override public long getLastModifiedTime() { - return fileStatus.getModificationTime(); - } - - @Override public boolean setLastModifiedTime(long timestamp) { - try { - fs.setTimes(fileStatus.getPath(), timestamp, timestamp); - } catch (IOException e) { - return false; - } - return true; - } - - /** - * This method will delete the data in file data from a given offset - */ - @Override public boolean truncate(String fileName, long validDataEndOffset) { - DataOutputStream dataOutputStream = null; - DataInputStream dataInputStream = null; - boolean fileTruncatedSuccessfully = false; - // if bytes to read less than 1024 then buffer size should be equal to the given offset - int bufferSize = validDataEndOffset > CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR ? - CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR : - (int) validDataEndOffset; - // temporary file name - String tempWriteFilePath = fileName + CarbonCommonConstants.TEMPWRITEFILEEXTENSION; - FileFactory.FileType fileType = FileFactory.getFileType(fileName); - try { - CarbonFile tempFile = null; - // delete temporary file if it already exists at a given path - if (FileFactory.isFileExist(tempWriteFilePath, fileType)) { - tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType); - tempFile.delete(); - } - // create new temporary file - FileFactory.createNewFile(tempWriteFilePath, fileType); - tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType); - byte[] buff = new byte[bufferSize]; - dataInputStream = FileFactory.getDataInputStream(fileName, fileType); - // read the data - int read = dataInputStream.read(buff, 0, buff.length); - dataOutputStream = FileFactory.getDataOutputStream(tempWriteFilePath, fileType); - dataOutputStream.write(buff, 0, read); - long remaining = validDataEndOffset - read; - // anytime we should not cross the offset to be read - while (remaining > 0) { - if (remaining > bufferSize) { - buff = new byte[bufferSize]; - } else { - buff = new byte[(int) remaining]; - } - read = dataInputStream.read(buff, 0, buff.length); - dataOutputStream.write(buff, 0, read); - remaining = remaining - read; - } - CarbonUtil.closeStreams(dataInputStream, dataOutputStream); - // rename the temp file to original file - tempFile.renameForce(fileName); - fileTruncatedSuccessfully = true; - } catch (IOException e) { - LOGGER.error("Exception occured while truncating the file " + e.getMessage()); - } finally { - CarbonUtil.closeStreams(dataOutputStream, dataInputStream); - } - return fileTruncatedSuccessfully; - } - - /** - * This method will be used to check whether a file has been modified or not - * - * @param fileTimeStamp time to be compared with latest timestamp of file - * @param endOffset file length to be compared with current length of file - * @return - */ - @Override public boolean isFileModified(long fileTimeStamp, long endOffset) { - boolean isFileModified = false; - if (getLastModifiedTime() > fileTimeStamp || getSize() > endOffset) { - isFileModified = true; - } - return isFileModified; - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/AlluxioCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/AlluxioCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/AlluxioCarbonFile.java deleted file mode 100644 index e4ce2b6..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/AlluxioCarbonFile.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.core.datastorage.filesystem; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.datastorage.impl.FileFactory; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; - - - -public class AlluxioCarbonFile extends AbstractDFSCarbonFile { - /** - * LOGGER - */ - private static final LogService LOGGER = - LogServiceFactory.getLogService(AlluxioCarbonFile.class.getName()); - - public AlluxioCarbonFile(String filePath) { - super(filePath); - } - - public AlluxioCarbonFile(Path path) { - super(path); - } - - public AlluxioCarbonFile(FileStatus fileStatus) { - super(fileStatus); - } - - /** - * @param listStatus - * @return - */ - private CarbonFile[] getFiles(FileStatus[] listStatus) { - if (listStatus == null) { - return new CarbonFile[0]; - } - CarbonFile[] files = new CarbonFile[listStatus.length]; - for (int i = 0; i < files.length; i++) { - files[i] = new AlluxioCarbonFile(listStatus[i]); - } - return files; - } - - @Override - public CarbonFile[] listFiles() { - FileStatus[] listStatus = null; - try { - if (null != fileStatus && fileStatus.isDirectory()) { - Path path = fileStatus.getPath(); - listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path); - } else { - return null; - } - } catch (IOException e) { - LOGGER.error("Exception occured: " + e.getMessage()); - return new CarbonFile[0]; - } - return getFiles(listStatus); - } - - @Override - public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) { - CarbonFile[] files = listFiles(); - if (files != null && files.length >= 1) { - List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length); - for (int i = 0; i < files.length; i++) { - if (fileFilter.accept(files[i])) { - fileList.add(files[i]); - } - } - if (fileList.size() >= 1) { - return fileList.toArray(new CarbonFile[fileList.size()]); - } else { - return new CarbonFile[0]; - } - } - return files; - } - - @Override - public CarbonFile getParentFile() { - Path parent = fileStatus.getPath().getParent(); - return null == parent ? null : new AlluxioCarbonFile(parent); - } - - @Override - public boolean renameForce(String changetoName) { - FileSystem fs; - try { - fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration()); - if (fs instanceof DistributedFileSystem) { - ((DistributedFileSystem) fs).rename(fileStatus.getPath(), new Path(changetoName), - org.apache.hadoop.fs.Options.Rename.OVERWRITE); - return true; - } else { - return false; - } - } catch (IOException e) { - LOGGER.error("Exception occured: " + e.getMessage()); - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/CarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/CarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/CarbonFile.java deleted file mode 100644 index 0ac4d52..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/CarbonFile.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.core.datastorage.filesystem; - -public interface CarbonFile { - - String getAbsolutePath(); - - CarbonFile[] listFiles(CarbonFileFilter fileFilter); - - CarbonFile[] listFiles(); - - String getName(); - - boolean isDirectory(); - - boolean exists(); - - String getCanonicalPath(); - - CarbonFile getParentFile(); - - String getPath(); - - long getSize(); - - boolean renameTo(String changetoName); - - boolean renameForce(String changetoName); - - boolean delete(); - - boolean createNewFile(); - - long getLastModifiedTime(); - - boolean setLastModifiedTime(long timestamp); - - boolean truncate(String fileName, long validDataEndOffset); - - /** - * This method will be used to check whether a file has been modified or not - * - * @param fileTimeStamp time to be compared with latest timestamp of file - * @param endOffset file length to be compared with current length of file - * @return - */ - boolean isFileModified(long fileTimeStamp, long endOffset); -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/CarbonFileFilter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/CarbonFileFilter.java b/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/CarbonFileFilter.java deleted file mode 100644 index e382f92..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/CarbonFileFilter.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.core.datastorage.filesystem; - -public interface CarbonFileFilter { - boolean accept(CarbonFile file); -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/HDFSCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/HDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/HDFSCarbonFile.java deleted file mode 100644 index a2aaca1..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/HDFSCarbonFile.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.core.datastorage.filesystem; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.datastorage.impl.FileFactory; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; - -public class HDFSCarbonFile extends AbstractDFSCarbonFile { - /** - * LOGGER - */ - private static final LogService LOGGER = - LogServiceFactory.getLogService(HDFSCarbonFile.class.getName()); - - public HDFSCarbonFile(String filePath) { - super(filePath); - } - - public HDFSCarbonFile(Path path) { - super(path); - } - - public HDFSCarbonFile(FileStatus fileStatus) { - super(fileStatus); - } - - /** - * @param listStatus - * @return - */ - private CarbonFile[] getFiles(FileStatus[] listStatus) { - if (listStatus == null) { - return new CarbonFile[0]; - } - CarbonFile[] files = new CarbonFile[listStatus.length]; - for (int i = 0; i < files.length; i++) { - files[i] = new HDFSCarbonFile(listStatus[i]); - } - return files; - } - - @Override - public CarbonFile[] listFiles() { - FileStatus[] listStatus = null; - try { - if (null != fileStatus && fileStatus.isDirectory()) { - Path path = fileStatus.getPath(); - listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path); - } else { - return null; - } - } catch (IOException e) { - LOGGER.error("Exception occured: " + e.getMessage()); - return new CarbonFile[0]; - } - return getFiles(listStatus); - } - - @Override - public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) { - CarbonFile[] files = listFiles(); - if (files != null && files.length >= 1) { - List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length); - for (int i = 0; i < files.length; i++) { - if (fileFilter.accept(files[i])) { - fileList.add(files[i]); - } - } - if (fileList.size() >= 1) { - return fileList.toArray(new CarbonFile[fileList.size()]); - } else { - return new CarbonFile[0]; - } - } - return files; - } - - @Override - public CarbonFile getParentFile() { - Path parent = fileStatus.getPath().getParent(); - return null == parent ? null : new HDFSCarbonFile(parent); - } - - @Override - public boolean renameForce(String changetoName) { - FileSystem fs; - try { - fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration()); - if (fs instanceof DistributedFileSystem) { - ((DistributedFileSystem) fs).rename(fileStatus.getPath(), new Path(changetoName), - org.apache.hadoop.fs.Options.Rename.OVERWRITE); - return true; - } else { - return false; - } - } catch (IOException e) { - LOGGER.error("Exception occured: " + e.getMessage()); - return false; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/LocalCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/LocalCarbonFile.java deleted file mode 100644 index d7866c1..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/LocalCarbonFile.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.core.datastorage.filesystem; - -import java.io.File; -import java.io.FileFilter; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.channels.FileChannel; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastorage.impl.FileFactory; -import org.apache.carbondata.core.util.CarbonUtil; - -import org.apache.hadoop.fs.Path; - -public class LocalCarbonFile implements CarbonFile { - private static final LogService LOGGER = - LogServiceFactory.getLogService(LocalCarbonFile.class.getName()); - private File file; - - public LocalCarbonFile(String filePath) { - Path pathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(new Path(filePath)); - file = new File(pathWithoutSchemeAndAuthority.toString()); - } - - public LocalCarbonFile(File file) { - this.file = file; - } - - @Override public String getAbsolutePath() { - return file.getAbsolutePath(); - } - - @Override public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) { - if (!file.isDirectory()) { - return null; - } - - File[] files = file.listFiles(new FileFilter() { - - @Override public boolean accept(File pathname) { - return fileFilter.accept(new LocalCarbonFile(pathname)); - } - }); - - if (files == null) { - return new CarbonFile[0]; - } - - CarbonFile[] carbonFiles = new CarbonFile[files.length]; - - for (int i = 0; i < carbonFiles.length; i++) { - carbonFiles[i] = new LocalCarbonFile(files[i]); - } - - return carbonFiles; - } - - @Override public String getName() { - return file.getName(); - } - - @Override public boolean isDirectory() { - return file.isDirectory(); - } - - @Override public boolean exists() { - if (file != null) { - return file.exists(); - } - return false; - } - - @Override public String getCanonicalPath() { - try { - return file.getCanonicalPath(); - } catch (IOException e) { - LOGGER - .error(e, "Exception occured" + e.getMessage()); - } - return null; - } - - @Override public CarbonFile getParentFile() { - return new LocalCarbonFile(file.getParentFile()); - } - - @Override public String getPath() { - return file.getPath(); - } - - @Override public long getSize() { - return file.length(); - } - - public boolean renameTo(String changetoName) { - return file.renameTo(new File(changetoName)); - } - - public boolean delete() { - return file.delete(); - } - - @Override public CarbonFile[] listFiles() { - - if (!file.isDirectory()) { - return null; - } - File[] files = file.listFiles(); - if (files == null) { - return new CarbonFile[0]; - } - CarbonFile[] carbonFiles = new CarbonFile[files.length]; - for (int i = 0; i < carbonFiles.length; i++) { - carbonFiles[i] = new LocalCarbonFile(files[i]); - } - - return carbonFiles; - - } - - @Override public boolean createNewFile() { - try { - return file.createNewFile(); - } catch (IOException e) { - return false; - } - } - - @Override public long getLastModifiedTime() { - return file.lastModified(); - } - - @Override public boolean setLastModifiedTime(long timestamp) { - return file.setLastModified(timestamp); - } - - /** - * This method will delete the data in file data from a given offset - */ - @Override public boolean truncate(String fileName, long validDataEndOffset) { - FileChannel source = null; - FileChannel destination = null; - boolean fileTruncatedSuccessfully = false; - // temporary file name - String tempWriteFilePath = fileName + CarbonCommonConstants.TEMPWRITEFILEEXTENSION; - FileFactory.FileType fileType = FileFactory.getFileType(fileName); - try { - CarbonFile tempFile = null; - // delete temporary file if it already exists at a given path - if (FileFactory.isFileExist(tempWriteFilePath, fileType)) { - tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType); - tempFile.delete(); - } - // create new temporary file - FileFactory.createNewFile(tempWriteFilePath, fileType); - tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType); - source = new FileInputStream(fileName).getChannel(); - destination = new FileOutputStream(tempWriteFilePath).getChannel(); - long read = destination.transferFrom(source, 0, validDataEndOffset); - long totalBytesRead = read; - long remaining = validDataEndOffset - totalBytesRead; - // read till required data offset is not reached - while (remaining > 0) { - read = destination.transferFrom(source, totalBytesRead, remaining); - totalBytesRead = totalBytesRead + read; - remaining = remaining - totalBytesRead; - } - CarbonUtil.closeStreams(source, destination); - // rename the temp file to original file - tempFile.renameForce(fileName); - fileTruncatedSuccessfully = true; - } catch (IOException e) { - LOGGER.error("Exception occured while truncating the file " + e.getMessage()); - } finally { - CarbonUtil.closeStreams(source, destination); - } - return fileTruncatedSuccessfully; - } - - /** - * This method will be used to check whether a file has been modified or not - * - * @param fileTimeStamp time to be compared with latest timestamp of file - * @param endOffset file length to be compared with current length of file - * @return - */ - @Override public boolean isFileModified(long fileTimeStamp, long endOffset) { - boolean isFileModified = false; - if (getLastModifiedTime() > fileTimeStamp || getSize() > endOffset) { - isFileModified = true; - } - return isFileModified; - } - - @Override public boolean renameForce(String changetoName) { - File destFile = new File(changetoName); - if (destFile.exists()) { - if (destFile.delete()) { - return file.renameTo(new File(changetoName)); - } - } - - return file.renameTo(new File(changetoName)); - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/ViewFSCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/ViewFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/ViewFSCarbonFile.java deleted file mode 100644 index 3fcf387..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/filesystem/ViewFSCarbonFile.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.carbondata.core.datastorage.filesystem; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.datastorage.impl.FileFactory; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.viewfs.ViewFileSystem; - -public class ViewFSCarbonFile extends AbstractDFSCarbonFile { - /** - * LOGGER - */ - private static final LogService LOGGER = - LogServiceFactory.getLogService(ViewFSCarbonFile.class.getName()); - - public ViewFSCarbonFile(String filePath) { - super(filePath); - } - - public ViewFSCarbonFile(Path path) { - super(path); - } - - public ViewFSCarbonFile(FileStatus fileStatus) { - super(fileStatus); - } - - /** - * @param listStatus - * @return - */ - private CarbonFile[] getFiles(FileStatus[] listStatus) { - if (listStatus == null) { - return new CarbonFile[0]; - } - CarbonFile[] files = new CarbonFile[listStatus.length]; - for (int i = 0; i < files.length; i++) { - files[i] = new ViewFSCarbonFile(listStatus[i]); - } - return files; - } - - @Override - public CarbonFile[] listFiles() { - FileStatus[] listStatus = null; - try { - if (null != fileStatus && fileStatus.isDirectory()) { - Path path = fileStatus.getPath(); - listStatus = path.getFileSystem(FileFactory.getConfiguration()).listStatus(path); - } else { - return null; - } - } catch (IOException ex) { - LOGGER.error("Exception occured" + ex.getMessage()); - return new CarbonFile[0]; - } - return getFiles(listStatus); - } - - @Override - public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) { - CarbonFile[] files = listFiles(); - if (files != null && files.length >= 1) { - List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length); - for (int i = 0; i < files.length; i++) { - if (fileFilter.accept(files[i])) { - fileList.add(files[i]); - } - } - if (fileList.size() >= 1) { - return fileList.toArray(new CarbonFile[fileList.size()]); - } else { - return new CarbonFile[0]; - } - } - return files; - } - - @Override public CarbonFile getParentFile() { - Path parent = fileStatus.getPath().getParent(); - return null == parent ? null : new ViewFSCarbonFile(parent); - } - - @Override - public boolean renameForce(String changetoName) { - FileSystem fs; - try { - fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration()); - if (fs instanceof ViewFileSystem) { - fs.delete(new Path(changetoName), true); - fs.rename(fileStatus.getPath(), new Path(changetoName)); - return true; - } else { - return false; - } - } catch (IOException e) { - LOGGER.error("Exception occured" + e.getMessage()); - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastorage/impl/CompressedDataMeasureDataWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/impl/CompressedDataMeasureDataWrapper.java b/core/src/main/java/org/apache/carbondata/core/datastorage/impl/CompressedDataMeasureDataWrapper.java deleted file mode 100644 index 997543b..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/impl/CompressedDataMeasureDataWrapper.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.core.datastorage.impl; - -import org.apache.carbondata.core.datastorage.MeasureDataWrapper; -import org.apache.carbondata.core.datastorage.dataholder.CarbonReadDataHolder; - -public class CompressedDataMeasureDataWrapper implements MeasureDataWrapper { - - private final CarbonReadDataHolder[] values; - - public CompressedDataMeasureDataWrapper(final CarbonReadDataHolder[] values) { - this.values = values; - } - - @Override public CarbonReadDataHolder[] getValues() { - return values; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastorage/impl/DFSFileHolderImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/impl/DFSFileHolderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastorage/impl/DFSFileHolderImpl.java deleted file mode 100644 index 443344f..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/impl/DFSFileHolderImpl.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.carbondata.core.datastorage.impl; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastorage.FileHolder; - -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - - -public class DFSFileHolderImpl implements FileHolder { - /** - * cache to hold filename and its stream - */ - private Map<String, FSDataInputStream> fileNameAndStreamCache; - - public DFSFileHolderImpl() { - this.fileNameAndStreamCache = - new HashMap<String, FSDataInputStream>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - } - - @Override public byte[] readByteArray(String filePath, long offset, int length) - throws IOException { - FSDataInputStream fileChannel = updateCache(filePath); - return read(fileChannel, length, offset); - } - - /** - * This method will be used to check whether stream is already present in - * cache or not for filepath if not present then create it and then add to - * cache, other wise get from cache - * - * @param filePath fully qualified file path - * @return channel - */ - private FSDataInputStream updateCache(String filePath) throws IOException { - FSDataInputStream fileChannel = fileNameAndStreamCache.get(filePath); - if (null == fileChannel) { - Path pt = new Path(filePath); - FileSystem fs = FileSystem.get(FileFactory.getConfiguration()); - fileChannel = fs.open(pt); - fileNameAndStreamCache.put(filePath, fileChannel); - } - return fileChannel; - } - - /** - * This method will be used to read from file based on number of bytes to be read and positon - * - * @param channel file channel - * @param size number of bytes - * @param offset position - * @return byte buffer - */ - private byte[] read(FSDataInputStream channel, int size, long offset) throws IOException { - byte[] byteBffer = new byte[size]; - channel.seek(offset); - channel.readFully(byteBffer); - return byteBffer; - } - - /** - * This method will be used to read from file based on number of bytes to be read and positon - * - * @param channel file channel - * @param size number of bytes - * @return byte buffer - */ - private byte[] read(FSDataInputStream channel, int size) throws IOException { - byte[] byteBffer = new byte[size]; - channel.readFully(byteBffer); - return byteBffer; - } - - @Override public int readInt(String filePath, long offset) throws IOException { - FSDataInputStream fileChannel = updateCache(filePath); - fileChannel.seek(offset); - return fileChannel.readInt(); - } - - @Override public long readDouble(String filePath, long offset) throws IOException { - FSDataInputStream fileChannel = updateCache(filePath); - fileChannel.seek(offset); - return fileChannel.readLong(); - } - - @Override public void finish() throws IOException { - for (Entry<String, FSDataInputStream> entry : fileNameAndStreamCache.entrySet()) { - FSDataInputStream channel = entry.getValue(); - if (null != channel) { - channel.close(); - } - } - } - - @Override public byte[] readByteArray(String filePath, int length) throws IOException { - FSDataInputStream fileChannel = updateCache(filePath); - return read(fileChannel, length); - } - - @Override public long readLong(String filePath, long offset) throws IOException { - FSDataInputStream fileChannel = updateCache(filePath); - fileChannel.seek(offset); - return fileChannel.readLong(); - } - - @Override public int readInt(String filePath) throws IOException { - FSDataInputStream fileChannel = updateCache(filePath); - return fileChannel.readInt(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastorage/impl/FileFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastorage/impl/FileFactory.java deleted file mode 100644 index ca9956d..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/impl/FileFactory.java +++ /dev/null @@ -1,485 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.core.datastorage.impl; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.zip.GZIPInputStream; - -import org.apache.carbondata.core.datastorage.FileHolder; -import org.apache.carbondata.core.datastorage.filesystem.AlluxioCarbonFile; -import org.apache.carbondata.core.datastorage.filesystem.CarbonFile; -import org.apache.carbondata.core.datastorage.filesystem.HDFSCarbonFile; -import org.apache.carbondata.core.datastorage.filesystem.LocalCarbonFile; -import org.apache.carbondata.core.datastorage.filesystem.ViewFSCarbonFile; -import org.apache.carbondata.core.util.CarbonUtil; - -import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.compress.BZip2Codec; -import org.apache.hadoop.io.compress.GzipCodec; - -public final class FileFactory { - private static Configuration configuration = null; - - static { - configuration = new Configuration(); - configuration.addResource(new Path("../core-default.xml")); - } - - private FileFactory() { - - } - - public static Configuration getConfiguration() { - return configuration; - } - - public static FileHolder getFileHolder(FileType fileType) { - switch (fileType) { - case LOCAL: - return new FileHolderImpl(); - case HDFS: - case ALLUXIO: - case VIEWFS: - return new DFSFileHolderImpl(); - default: - return new FileHolderImpl(); - } - } - - public static FileType getFileType(String path) { - if (path.startsWith(CarbonUtil.HDFS_PREFIX)) { - return FileType.HDFS; - } - else if (path.startsWith(CarbonUtil.ALLUXIO_PREFIX)) { - return FileType.ALLUXIO; - } - else if (path.startsWith(CarbonUtil.VIEWFS_PREFIX)) { - return FileType.VIEWFS; - } - return FileType.LOCAL; - } - - public static CarbonFile getCarbonFile(String path, FileType fileType) { - switch (fileType) { - case LOCAL: - return new LocalCarbonFile(path); - case HDFS: - return new HDFSCarbonFile(path); - case ALLUXIO: - return new AlluxioCarbonFile(path); - case VIEWFS: - return new ViewFSCarbonFile(path); - default: - return new LocalCarbonFile(path); - } - } - - public static DataInputStream getDataInputStream(String path, FileType fileType) - throws IOException { - return getDataInputStream(path, fileType, -1); - } - - public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize) - throws IOException { - path = path.replace("\\", "/"); - boolean gzip = path.endsWith(".gz"); - boolean bzip2 = path.endsWith(".bz2"); - InputStream stream; - switch (fileType) { - case LOCAL: - if (gzip) { - stream = new GZIPInputStream(new FileInputStream(path)); - } else if (bzip2) { - stream = new BZip2CompressorInputStream(new FileInputStream(path)); - } else { - stream = new FileInputStream(path); - } - break; - case HDFS: - case ALLUXIO: - case VIEWFS: - Path pt = new Path(path); - FileSystem fs = pt.getFileSystem(configuration); - if (bufferSize == -1) { - stream = fs.open(pt); - } else { - stream = fs.open(pt, bufferSize); - } - if (gzip) { - GzipCodec codec = new GzipCodec(); - stream = codec.createInputStream(stream); - } else if (bzip2) { - BZip2Codec codec = new BZip2Codec(); - stream = codec.createInputStream(stream); - } - break; - default: - throw new UnsupportedOperationException("unsupported file system"); - } - return new DataInputStream(new BufferedInputStream(stream)); - } - - /** - * return the datainputStream which is seek to the offset of file - * - * @param path - * @param fileType - * @param bufferSize - * @param offset - * @return DataInputStream - * @throws IOException - */ - public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize, - long offset) throws IOException { - path = path.replace("\\", "/"); - switch (fileType) { - case HDFS: - case ALLUXIO: - case VIEWFS: - Path pt = new Path(path); - FileSystem fs = pt.getFileSystem(configuration); - FSDataInputStream stream = fs.open(pt, bufferSize); - stream.seek(offset); - return new DataInputStream(new BufferedInputStream(stream)); - default: - FileInputStream fis = new FileInputStream(path); - long actualSkipSize = 0; - long skipSize = offset; - while (actualSkipSize != offset) { - actualSkipSize += fis.skip(skipSize); - skipSize = skipSize - actualSkipSize; - } - return new DataInputStream(new BufferedInputStream(fis)); - } - } - - public static DataOutputStream getDataOutputStream(String path, FileType fileType) - throws IOException { - path = path.replace("\\", "/"); - switch (fileType) { - case LOCAL: - return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path))); - case HDFS: - case ALLUXIO: - case VIEWFS: - Path pt = new Path(path); - FileSystem fs = pt.getFileSystem(configuration); - FSDataOutputStream stream = fs.create(pt, true); - return stream; - default: - return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path))); - } - } - - public static DataOutputStream getDataOutputStream(String path, FileType fileType, int bufferSize, - boolean append) throws IOException { - path = path.replace("\\", "/"); - switch (fileType) { - case LOCAL: - return new DataOutputStream( - new BufferedOutputStream(new FileOutputStream(path, append), bufferSize)); - case HDFS: - case ALLUXIO: - case VIEWFS: - Path pt = new Path(path); - FileSystem fs = pt.getFileSystem(configuration); - FSDataOutputStream stream = null; - if (append) { - // append to a file only if file already exists else file not found - // exception will be thrown by hdfs - if (CarbonUtil.isFileExists(path)) { - stream = fs.append(pt, bufferSize); - } else { - stream = fs.create(pt, true, bufferSize); - } - } else { - stream = fs.create(pt, true, bufferSize); - } - return stream; - default: - return new DataOutputStream( - new BufferedOutputStream(new FileOutputStream(path), bufferSize)); - } - } - - public static DataOutputStream getDataOutputStream(String path, FileType fileType, int bufferSize, - long blockSize) throws IOException { - path = path.replace("\\", "/"); - switch (fileType) { - case LOCAL: - return new DataOutputStream( - new BufferedOutputStream(new FileOutputStream(path), bufferSize)); - case HDFS: - case ALLUXIO: - case VIEWFS: - Path pt = new Path(path); - FileSystem fs = pt.getFileSystem(configuration); - FSDataOutputStream stream = - fs.create(pt, true, bufferSize, fs.getDefaultReplication(pt), blockSize); - return stream; - default: - return new DataOutputStream( - new BufferedOutputStream(new FileOutputStream(path), bufferSize)); - } - } - - /** - * This method checks the given path exists or not and also is it file or - * not if the performFileCheck is true - * - * @param filePath - Path - * @param fileType - FileType Local/HDFS - * @param performFileCheck - Provide false for folders, true for files and - */ - public static boolean isFileExist(String filePath, FileType fileType, boolean performFileCheck) - throws IOException { - filePath = filePath.replace("\\", "/"); - switch (fileType) { - case HDFS: - case ALLUXIO: - case VIEWFS: - Path path = new Path(filePath); - FileSystem fs = path.getFileSystem(configuration); - if (performFileCheck) { - return fs.exists(path) && fs.isFile(path); - } else { - return fs.exists(path); - } - - case LOCAL: - default: - File defaultFile = new File(filePath); - - if (performFileCheck) { - return defaultFile.exists() && defaultFile.isFile(); - } else { - return defaultFile.exists(); - } - } - } - - /** - * This method checks the given path exists or not and also is it file or - * not if the performFileCheck is true - * - * @param filePath - Path - * @param fileType - FileType Local/HDFS - */ - public static boolean isFileExist(String filePath, FileType fileType) throws IOException { - filePath = filePath.replace("\\", "/"); - switch (fileType) { - case HDFS: - case ALLUXIO: - case VIEWFS: - Path path = new Path(filePath); - FileSystem fs = path.getFileSystem(configuration); - return fs.exists(path); - - case LOCAL: - default: - File defaultFile = new File(filePath); - return defaultFile.exists(); - } - } - - public static boolean createNewFile(String filePath, FileType fileType) throws IOException { - filePath = filePath.replace("\\", "/"); - switch (fileType) { - case HDFS: - case ALLUXIO: - case VIEWFS: - Path path = new Path(filePath); - FileSystem fs = path.getFileSystem(configuration); - return fs.createNewFile(path); - - case LOCAL: - default: - File file = new File(filePath); - return file.createNewFile(); - } - } - - public static boolean deleteFile(String filePath, FileType fileType) throws IOException { - filePath = filePath.replace("\\", "/"); - switch (fileType) { - case HDFS: - case ALLUXIO: - case VIEWFS: - Path path = new Path(filePath); - FileSystem fs = path.getFileSystem(configuration); - return fs.delete(path, true); - - case LOCAL: - default: - File file = new File(filePath); - return deleteAllFilesOfDir(file); - } - } - - public static boolean deleteAllFilesOfDir(File path) { - if (!path.exists()) { - return true; - } - if (path.isFile()) { - return path.delete(); - } - File[] files = path.listFiles(); - for (int i = 0; i < files.length; i++) { - deleteAllFilesOfDir(files[i]); - } - return path.delete(); - } - - public static boolean mkdirs(String filePath, FileType fileType) throws IOException { - filePath = filePath.replace("\\", "/"); - switch (fileType) { - case HDFS: - case ALLUXIO: - case VIEWFS: - Path path = new Path(filePath); - FileSystem fs = path.getFileSystem(configuration); - return fs.mkdirs(path); - case LOCAL: - default: - File file = new File(filePath); - return file.mkdirs(); - } - } - - /** - * for getting the dataoutput stream using the hdfs filesystem append API. - * - * @param path - * @param fileType - * @return - * @throws IOException - */ - public static DataOutputStream getDataOutputStreamUsingAppend(String path, FileType fileType) - throws IOException { - path = path.replace("\\", "/"); - switch (fileType) { - case LOCAL: - return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path, true))); - case HDFS: - case ALLUXIO: - case VIEWFS: - Path pt = new Path(path); - FileSystem fs = pt.getFileSystem(configuration); - FSDataOutputStream stream = fs.append(pt); - return stream; - default: - return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path))); - } - } - - /** - * for creating a new Lock file and if it is successfully created - * then in case of abrupt shutdown then the stream to that file will be closed. - * - * @param filePath - * @param fileType - * @return - * @throws IOException - */ - public static boolean createNewLockFile(String filePath, FileType fileType) throws IOException { - filePath = filePath.replace("\\", "/"); - switch (fileType) { - case HDFS: - case ALLUXIO: - case VIEWFS: - Path path = new Path(filePath); - FileSystem fs = path.getFileSystem(configuration); - if (fs.createNewFile(path)) { - fs.deleteOnExit(path); - return true; - } - return false; - case LOCAL: - default: - File file = new File(filePath); - return file.createNewFile(); - } - } - - public enum FileType { - LOCAL, HDFS, ALLUXIO, VIEWFS - } - - /** - * below method will be used to update the file path - * for local type - * it removes the file:/ from the path - * - * @param filePath - * @return updated file path without url for local - */ - public static String getUpdatedFilePath(String filePath) { - FileType fileType = getFileType(filePath); - switch (fileType) { - case HDFS: - case ALLUXIO: - case VIEWFS: - return filePath; - case LOCAL: - default: - Path pathWithoutSchemeAndAuthority = - Path.getPathWithoutSchemeAndAuthority(new Path(filePath)); - return pathWithoutSchemeAndAuthority.toString(); - } - } - - /** - * It computes size of directory - * - * @param filePath - * @return size in bytes - * @throws IOException - */ - public static long getDirectorySize(String filePath) throws IOException { - FileType fileType = getFileType(filePath); - switch (fileType) { - case HDFS: - case ALLUXIO: - case VIEWFS: - Path path = new Path(filePath); - FileSystem fs = path.getFileSystem(configuration); - return fs.getContentSummary(path).getLength(); - case LOCAL: - default: - File file = new File(filePath); - return FileUtils.sizeOfDirectory(file); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastorage/impl/FileHolderImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/impl/FileHolderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastorage/impl/FileHolderImpl.java deleted file mode 100644 index 843ec5a..0000000 --- a/core/src/main/java/org/apache/carbondata/core/datastorage/impl/FileHolderImpl.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.core.datastorage.impl; - -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastorage.FileHolder; - -public class FileHolderImpl implements FileHolder { - /** - * cache to hold filename and its stream - */ - private Map<String, FileChannel> fileNameAndStreamCache; - - /** - * FileHolderImpl Constructor - * It will create the cache - */ - public FileHolderImpl() { - this.fileNameAndStreamCache = - new HashMap<String, FileChannel>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - } - - public FileHolderImpl(int capacity) { - this.fileNameAndStreamCache = new HashMap<String, FileChannel>(capacity); - } - - /** - * This method will be used to read the byte array from file based on offset - * and length(number of bytes) need to read - * - * @param filePath fully qualified file path - * @param offset reading start position, - * @param length number of bytes to be read - * @return read byte array - */ - @Override public byte[] readByteArray(String filePath, long offset, int length) - throws IOException { - FileChannel fileChannel = updateCache(filePath); - ByteBuffer byteBffer = read(fileChannel, length, offset); - return byteBffer.array(); - } - - /** - * This method will be used to close all the streams currently present in the cache - */ - @Override public void finish() throws IOException { - for (Entry<String, FileChannel> entry : fileNameAndStreamCache.entrySet()) { - FileChannel channel = entry.getValue(); - if (null != channel) { - channel.close(); - } - } - } - - /** - * This method will be used to read int from file from postion(offset), here - * length will be always 4 bacause int byte size if 4 - * - * @param filePath fully qualified file path - * @param offset reading start position, - * @return read int - */ - @Override public int readInt(String filePath, long offset) throws IOException { - FileChannel fileChannel = updateCache(filePath); - ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.INT_SIZE_IN_BYTE, offset); - return byteBffer.getInt(); - } - - /** - * This method will be used to read int from file from postion(offset), here - * length will be always 4 bacause int byte size if 4 - * - * @param filePath fully qualified file path - * @return read int - */ - @Override public int readInt(String filePath) throws IOException { - FileChannel fileChannel = updateCache(filePath); - ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.INT_SIZE_IN_BYTE); - return byteBffer.getInt(); - } - - /** - * This method will be used to read int from file from postion(offset), here - * length will be always 4 bacause int byte size if 4 - * - * @param filePath fully qualified file path - * @param offset reading start position, - * @return read int - */ - @Override public long readDouble(String filePath, long offset) throws IOException { - FileChannel fileChannel = updateCache(filePath); - ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.LONG_SIZE_IN_BYTE, offset); - return byteBffer.getLong(); - } - - /** - * This method will be used to check whether stream is already present in - * cache or not for filepath if not present then create it and then add to - * cache, other wise get from cache - * - * @param filePath fully qualified file path - * @return channel - */ - private FileChannel updateCache(String filePath) throws FileNotFoundException { - FileChannel fileChannel = fileNameAndStreamCache.get(filePath); - if (null == fileChannel) { - FileInputStream stream = new FileInputStream(filePath); - fileChannel = stream.getChannel(); - fileNameAndStreamCache.put(filePath, fileChannel); - } - return fileChannel; - } - - /** - * This method will be used to read from file based on number of bytes to be read and positon - * - * @param channel file channel - * @param size number of bytes - * @param offset position - * @return byte buffer - */ - private ByteBuffer read(FileChannel channel, int size, long offset) throws IOException { - ByteBuffer byteBffer = ByteBuffer.allocate(size); - channel.position(offset); - channel.read(byteBffer); - byteBffer.rewind(); - return byteBffer; - } - - /** - * This method will be used to read from file based on number of bytes to be read and positon - * - * @param channel file channel - * @param size number of bytes - * @return byte buffer - */ - private ByteBuffer read(FileChannel channel, int size) throws IOException { - ByteBuffer byteBffer = ByteBuffer.allocate(size); - channel.read(byteBffer); - byteBffer.rewind(); - return byteBffer; - } - - - /** - * This method will be used to read the byte array from file based on length(number of bytes) - * - * @param filePath fully qualified file path - * @param length number of bytes to be read - * @return read byte array - */ - @Override public byte[] readByteArray(String filePath, int length) throws IOException { - FileChannel fileChannel = updateCache(filePath); - ByteBuffer byteBffer = read(fileChannel, length); - return byteBffer.array(); - } - - /** - * This method will be used to read long from file from postion(offset), here - * length will be always 8 bacause int byte size is 8 - * - * @param filePath fully qualified file path - * @param offset reading start position, - * @return read long - */ - @Override public long readLong(String filePath, long offset) throws IOException { - FileChannel fileChannel = updateCache(filePath); - ByteBuffer byteBffer = read(fileChannel, CarbonCommonConstants.LONG_SIZE_IN_BYTE, offset); - return byteBffer.getLong(); - } - -}