Repository: carbondata Updated Branches: refs/heads/carbonstore 0aab4e7c6 -> 96fe233a2
http://git-wip-us.apache.org/repos/asf/carbondata/blob/96fe233a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java new file mode 100644 index 0000000..c4b501d --- /dev/null +++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java @@ -0,0 +1,758 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.reader.CarbonHeaderReader; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.intf.RowImpl; +import org.apache.carbondata.core.scan.filter.intf.RowIntf; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.format.BlockletHeader; +import org.apache.carbondata.format.FileHeader; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.InputMetricsStats; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVector; +import org.apache.spark.sql.execution.vectorized.ColumnarBatch; +import org.apache.spark.sql.types.CalendarIntervalType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * Stream record reader + */ +public class CarbonStreamRecordReader extends RecordReader<Void, Object> { + // vector reader + private boolean isVectorReader; + + // metadata + private CarbonTable carbonTable; + private CarbonColumn[] storageColumns; + private boolean[] isRequired; + private DataType[] measureDataTypes; + private int dimensionCount; + private int measureCount; + + // input + private FileSplit fileSplit; + private Configuration hadoopConf; + private StreamBlockletReader input; + private boolean isFirstRow = true; + private QueryModel model; + + // decode data + private BitSet allNonNull; + private boolean[] isNoDictColumn; + private DirectDictionaryGenerator[] directDictionaryGenerators; + private CacheProvider cacheProvider; + private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache; + private GenericQueryType[] queryTypes; + + // vectorized reader + private StructType outputSchema; + private ColumnarBatch columnarBatch; + private boolean isFinished = false; + + // filter + private FilterExecuter filter; + private boolean[] isFilterRequired; + private Object[] filterValues; + private RowIntf filterRow; + private int[] filterMap; + + // output + private CarbonColumn[] projection; + private boolean[] isProjectionRequired; + private int[] projectionMap; + private Object[] outputValues; + private InternalRow outputRow; + + // empty project, null filter + private boolean skipScanData; + + // return raw row for handoff + private boolean useRawRow = false; + + // InputMetricsStats + private InputMetricsStats inputMetricsStats; + + @Override public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + // input + if (split instanceof CarbonInputSplit) { + fileSplit = (CarbonInputSplit) split; + } else if (split instanceof CarbonMultiBlockSplit) { + fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0); + } else { + fileSplit = (FileSplit) split; + } + + // metadata + hadoopConf = context.getConfiguration(); + if (model == null) { + CarbonTableInputFormat format = new CarbonTableInputFormat<Object>(); + model = format.createQueryModel(split, context); + } + carbonTable = model.getTable(); + List<CarbonDimension> dimensions = + carbonTable.getDimensionByTableName(carbonTable.getTableName()); + dimensionCount = dimensions.size(); + List<CarbonMeasure> measures = + carbonTable.getMeasureByTableName(carbonTable.getTableName()); + measureCount = measures.size(); + List<CarbonColumn> carbonColumnList = + carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName()); + storageColumns = carbonColumnList.toArray(new CarbonColumn[carbonColumnList.size()]); + isNoDictColumn = CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns); + directDictionaryGenerators = new DirectDictionaryGenerator[storageColumns.length]; + for (int i = 0; i < storageColumns.length; i++) { + if (storageColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) { + directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(storageColumns[i].getDataType()); + } + } + measureDataTypes = new DataType[measureCount]; + for (int i = 0; i < measureCount; i++) { + measureDataTypes[i] = storageColumns[dimensionCount + i].getDataType(); + } + + // decode data + allNonNull = new BitSet(storageColumns.length); + projection = model.getProjectionColumns(); + + isRequired = new boolean[storageColumns.length]; + boolean[] isFiltlerDimensions = model.getIsFilterDimensions(); + boolean[] isFiltlerMeasures = model.getIsFilterMeasures(); + isFilterRequired = new boolean[storageColumns.length]; + filterMap = new int[storageColumns.length]; + for (int i = 0; i < storageColumns.length; i++) { + if (storageColumns[i].isDimension()) { + if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) { + isRequired[i] = true; + isFilterRequired[i] = true; + filterMap[i] = storageColumns[i].getOrdinal(); + } + } else { + if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) { + isRequired[i] = true; + isFilterRequired[i] = true; + filterMap[i] = carbonTable.getDimensionOrdinalMax() + storageColumns[i].getOrdinal(); + } + } + } + + isProjectionRequired = new boolean[storageColumns.length]; + projectionMap = new int[storageColumns.length]; + for (int j = 0; j < projection.length; j++) { + for (int i = 0; i < storageColumns.length; i++) { + if (storageColumns[i].getColName().equals(projection[j].getColName())) { + isRequired[i] = true; + isProjectionRequired[i] = true; + projectionMap[i] = j; + break; + } + } + } + + // initialize filter + if (null != model.getFilterExpressionResolverTree()) { + initializeFilter(); + } else if (projection.length == 0) { + skipScanData = true; + } + + } + + private void initializeFilter() { + + List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil + .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()), + carbonTable.getMeasureByTableName(carbonTable.getTableName())); + int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()]; + for (int i = 0; i < dimLensWithComplex.length; i++) { + dimLensWithComplex[i] = Integer.MAX_VALUE; + } + + int[] dictionaryColumnCardinality = + CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList); + SegmentProperties segmentProperties = + new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality); + Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>(); + + FilterResolverIntf resolverIntf = model.getFilterExpressionResolverTree(); + filter = FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties, + complexDimensionInfoMap); + // for row filter, we need update column index + FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(), + carbonTable.getDimensionOrdinalMax()); + + } + + public void setQueryModel(QueryModel model) { + this.model = model; + } + + private byte[] getSyncMarker(String filePath) throws IOException { + CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath); + FileHeader header = headerReader.readHeader(); + return header.getSync_marker(); + } + + public void setUseRawRow(boolean useRawRow) { + this.useRawRow = useRawRow; + } + + private void initializeAtFirstRow() throws IOException { + filterValues = new Object[carbonTable.getDimensionOrdinalMax() + measureCount]; + filterRow = new RowImpl(); + filterRow.setValues(filterValues); + + outputValues = new Object[projection.length]; + outputRow = new GenericInternalRow(outputValues); + + Path file = fileSplit.getPath(); + + byte[] syncMarker = getSyncMarker(file.toString()); + + FileSystem fs = file.getFileSystem(hadoopConf); + + int bufferSize = Integer.parseInt(hadoopConf.get(CarbonStreamInputFormat.READ_BUFFER_SIZE, + CarbonStreamInputFormat.READ_BUFFER_SIZE_DEFAULT)); + + FSDataInputStream fileIn = fs.open(file, bufferSize); + fileIn.seek(fileSplit.getStart()); + input = new StreamBlockletReader(syncMarker, fileIn, fileSplit.getLength(), + fileSplit.getStart() == 0); + + cacheProvider = CacheProvider.getInstance(); + cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY); + queryTypes = CarbonStreamInputFormat.getComplexDimensions(carbonTable, storageColumns, cache); + + outputSchema = new StructType((StructField[]) + DataTypeUtil.getDataTypeConverter().convertCarbonSchemaToSparkSchema(projection)); + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + if (isFirstRow) { + isFirstRow = false; + initializeAtFirstRow(); + } + if (isFinished) { + return false; + } + + if (isVectorReader) { + return nextColumnarBatch(); + } + + return nextRow(); + } + + /** + * for vector reader, check next columnar batch + */ + private boolean nextColumnarBatch() throws IOException { + boolean hasNext; + boolean scanMore = false; + do { + // move to the next blocklet + hasNext = input.nextBlocklet(); + if (hasNext) { + // read blocklet header + BlockletHeader header = input.readBlockletHeader(); + if (isScanRequired(header)) { + scanMore = !scanBlockletAndFillVector(header); + } else { + input.skipBlockletData(true); + scanMore = true; + } + } else { + isFinished = true; + scanMore = false; + } + } while (scanMore); + return hasNext; + } + + /** + * check next Row + */ + private boolean nextRow() throws IOException { + // read row one by one + try { + boolean hasNext; + boolean scanMore = false; + do { + hasNext = input.hasNext(); + if (hasNext) { + if (skipScanData) { + input.nextRow(); + scanMore = false; + } else { + if (useRawRow) { + // read raw row for streaming handoff which does not require decode raw row + readRawRowFromStream(); + } else { + readRowFromStream(); + } + if (null != filter) { + scanMore = !filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax()); + } else { + scanMore = false; + } + } + } else { + if (input.nextBlocklet()) { + BlockletHeader header = input.readBlockletHeader(); + if (isScanRequired(header)) { + if (skipScanData) { + input.skipBlockletData(false); + } else { + input.readBlockletData(header); + } + } else { + input.skipBlockletData(true); + } + scanMore = true; + } else { + isFinished = true; + scanMore = false; + } + } + } while (scanMore); + return hasNext; + } catch (FilterUnsupportedException e) { + throw new IOException("Failed to filter row in detail reader", e); + } + } + + @Override public Void getCurrentKey() throws IOException, InterruptedException { + return null; + } + + @Override public Object getCurrentValue() throws IOException, InterruptedException { + if (isVectorReader) { + int value = columnarBatch.numValidRows(); + if (inputMetricsStats != null) { + inputMetricsStats.incrementRecordRead((long) value); + } + + return columnarBatch; + } + + if (inputMetricsStats != null) { + inputMetricsStats.incrementRecordRead(1L); + } + + return outputRow; + } + + private boolean isScanRequired(BlockletHeader header) { + // TODO require to implement min-max index + return true; + } + + private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException { + // if filter is null and output projection is empty, use the row number of blocklet header + if (skipScanData) { + int rowNums = header.getBlocklet_info().getNum_rows(); + columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, rowNums); + columnarBatch.setNumRows(rowNums); + input.skipBlockletData(true); + return rowNums > 0; + } + + input.readBlockletData(header); + columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, input.getRowNums()); + int rowNum = 0; + if (null == filter) { + while (input.hasNext()) { + readRowFromStream(); + putRowToColumnBatch(rowNum++); + } + } else { + try { + while (input.hasNext()) { + readRowFromStream(); + if (filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax())) { + putRowToColumnBatch(rowNum++); + } + } + } catch (FilterUnsupportedException e) { + throw new IOException("Failed to filter row in vector reader", e); + } + } + columnarBatch.setNumRows(rowNum); + return rowNum > 0; + } + + private void readRowFromStream() { + input.nextRow(); + short nullLen = input.readShort(); + BitSet nullBitSet = allNonNull; + if (nullLen > 0) { + nullBitSet = BitSet.valueOf(input.readBytes(nullLen)); + } + int colCount = 0; + // primitive type dimension + for (; colCount < isNoDictColumn.length; colCount++) { + if (nullBitSet.get(colCount)) { + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = null; + } + } else { + if (isNoDictColumn[colCount]) { + int v = input.readShort(); + if (isRequired[colCount]) { + byte[] b = input.readBytes(v); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = b; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = + DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(b, + storageColumns[colCount].getDataType()); + } + } else { + input.skipBytes(v); + } + } else if (null != directDictionaryGenerators[colCount]) { + if (isRequired[colCount]) { + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = input.copy(4); + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = + directDictionaryGenerators[colCount].getValueFromSurrogate(input.readInt()); + } else { + input.skipBytes(4); + } + } else { + input.skipBytes(4); + } + } else { + if (isRequired[colCount]) { + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = input.copy(4); + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = input.readInt(); + } else { + input.skipBytes(4); + } + } else { + input.skipBytes(4); + } + } + } + } + // complex type dimension + for (; colCount < dimensionCount; colCount++) { + if (nullBitSet.get(colCount)) { + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = null; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = null; + } + } else { + short v = input.readShort(); + if (isRequired[colCount]) { + byte[] b = input.readBytes(v); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = b; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = queryTypes[colCount] + .getDataBasedOnDataType(ByteBuffer.wrap(b)); + } + } else { + input.skipBytes(v); + } + } + } + // measure + DataType dataType; + for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) { + if (nullBitSet.get(colCount)) { + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = null; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = null; + } + } else { + dataType = measureDataTypes[msrCount]; + if (dataType == DataTypes.BOOLEAN) { + if (isRequired[colCount]) { + boolean v = input.readBoolean(); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = v; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = v; + } + } else { + input.skipBytes(1); + } + } else if (dataType == DataTypes.SHORT) { + if (isRequired[colCount]) { + short v = input.readShort(); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = v; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = v; + } + } else { + input.skipBytes(2); + } + } else if (dataType == DataTypes.INT) { + if (isRequired[colCount]) { + int v = input.readInt(); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = v; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = v; + } + } else { + input.skipBytes(4); + } + } else if (dataType == DataTypes.LONG) { + if (isRequired[colCount]) { + long v = input.readLong(); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = v; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = v; + } + } else { + input.skipBytes(8); + } + } else if (dataType == DataTypes.DOUBLE) { + if (isRequired[colCount]) { + double v = input.readDouble(); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = v; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = v; + } + } else { + input.skipBytes(8); + } + } else if (DataTypes.isDecimal(dataType)) { + int len = input.readShort(); + if (isRequired[colCount]) { + BigDecimal v = DataTypeUtil.byteToBigDecimal(input.readBytes(len)); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = v; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = + DataTypeUtil.getDataTypeConverter().convertFromBigDecimalToDecimal(v); + } + } else { + input.skipBytes(len); + } + } + } + } + } + + private void readRawRowFromStream() { + input.nextRow(); + short nullLen = input.readShort(); + BitSet nullBitSet = allNonNull; + if (nullLen > 0) { + nullBitSet = BitSet.valueOf(input.readBytes(nullLen)); + } + int colCount = 0; + // primitive type dimension + for (; colCount < isNoDictColumn.length; colCount++) { + if (nullBitSet.get(colCount)) { + outputValues[colCount] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY; + } else { + if (isNoDictColumn[colCount]) { + int v = input.readShort(); + outputValues[colCount] = input.readBytes(v); + } else { + outputValues[colCount] = input.readInt(); + } + } + } + // complex type dimension + for (; colCount < dimensionCount; colCount++) { + if (nullBitSet.get(colCount)) { + outputValues[colCount] = null; + } else { + short v = input.readShort(); + outputValues[colCount] = input.readBytes(v); + } + } + // measure + DataType dataType; + for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) { + if (nullBitSet.get(colCount)) { + outputValues[colCount] = null; + } else { + dataType = measureDataTypes[msrCount]; + if (dataType == DataTypes.BOOLEAN) { + outputValues[colCount] = input.readBoolean(); + } else if (dataType == DataTypes.SHORT) { + outputValues[colCount] = input.readShort(); + } else if (dataType == DataTypes.INT) { + outputValues[colCount] = input.readInt(); + } else if (dataType == DataTypes.LONG) { + outputValues[colCount] = input.readLong(); + } else if (dataType == DataTypes.DOUBLE) { + outputValues[colCount] = input.readDouble(); + } else if (DataTypes.isDecimal(dataType)) { + int len = input.readShort(); + outputValues[colCount] = DataTypeUtil.byteToBigDecimal(input.readBytes(len)); + } + } + } + } + + private void putRowToColumnBatch(int rowId) { + for (int i = 0; i < projection.length; i++) { + Object value = outputValues[i]; + ColumnVector col = columnarBatch.column(i); + org.apache.spark.sql.types.DataType t = col.dataType(); + if (null == value) { + col.putNull(rowId); + } else { + if (t == org.apache.spark.sql.types.DataTypes.BooleanType) { + col.putBoolean(rowId, (boolean)value); + } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) { + col.putByte(rowId, (byte) value); + } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) { + col.putShort(rowId, (short) value); + } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) { + col.putInt(rowId, (int) value); + } else if (t == org.apache.spark.sql.types.DataTypes.LongType) { + col.putLong(rowId, (long) value); + } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) { + col.putFloat(rowId, (float) value); + } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) { + col.putDouble(rowId, (double) value); + } else if (t == org.apache.spark.sql.types.DataTypes.StringType) { + UTF8String v = (UTF8String) value; + col.putByteArray(rowId, v.getBytes()); + } else if (t instanceof org.apache.spark.sql.types.DecimalType) { + DecimalType dt = (DecimalType)t; + Decimal d = Decimal.fromDecimal(value); + if (dt.precision() <= Decimal.MAX_INT_DIGITS()) { + col.putInt(rowId, (int)d.toUnscaledLong()); + } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) { + col.putLong(rowId, d.toUnscaledLong()); + } else { + final BigInteger integer = d.toJavaBigDecimal().unscaledValue(); + byte[] bytes = integer.toByteArray(); + col.putByteArray(rowId, bytes, 0, bytes.length); + } + } else if (t instanceof CalendarIntervalType) { + CalendarInterval c = (CalendarInterval) value; + col.getChildColumn(0).putInt(rowId, c.months); + col.getChildColumn(1).putLong(rowId, c.microseconds); + } else if (t instanceof org.apache.spark.sql.types.DateType) { + col.putInt(rowId, (int) value); + } else if (t instanceof org.apache.spark.sql.types.TimestampType) { + col.putLong(rowId, (long) value); + } + } + } + } + + @Override public float getProgress() throws IOException, InterruptedException { + return 0; + } + + public void setVectorReader(boolean isVectorReader) { + this.isVectorReader = isVectorReader; + } + + public void setInputMetricsStats(InputMetricsStats inputMetricsStats) { + this.inputMetricsStats = inputMetricsStats; + } + + @Override public void close() throws IOException { + if (null != input) { + input.close(); + } + if (null != columnarBatch) { + columnarBatch.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/96fe233a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamUtils.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamUtils.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamUtils.java deleted file mode 100644 index a7940f9..0000000 --- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamUtils.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.streaming; - -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; - -/** - * Util class which does utility function for stream module - */ -public class CarbonStreamUtils { - - public static Constructor getConstructorWithReflection(String className, - Class<?>... parameterTypes) - throws ClassNotFoundException, NoSuchMethodException { - Class loadedClass = Class.forName(className); - return loadedClass.getConstructor(parameterTypes); - - } - - public static Object getInstanceWithReflection(Constructor cons, Object... initargs) throws - IllegalAccessException, - InvocationTargetException, InstantiationException { - return cons.newInstance(initargs); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/96fe233a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java index 5c7ad5e..43fe6ed 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java @@ -46,8 +46,7 @@ public class StreamBlockletReader { private int rowIndex = 0; private boolean isHeaderPresent; - public StreamBlockletReader(byte[] syncMarker, InputStream in, long limit, - boolean isHeaderPresent) { + StreamBlockletReader(byte[] syncMarker, InputStream in, long limit, boolean isHeaderPresent) { this.syncMarker = syncMarker; syncLen = syncMarker.length; syncBuffer = new byte[syncLen]; @@ -97,7 +96,7 @@ public class StreamBlockletReader { return false; } - public BlockletHeader readBlockletHeader() throws IOException { + BlockletHeader readBlockletHeader() throws IOException { int len = readIntFromStream(); byte[] b = new byte[len]; if (!readBytesFromStream(b, 0, len)) { @@ -109,7 +108,7 @@ public class StreamBlockletReader { return header; } - public void readBlockletData(BlockletHeader header) throws IOException { + void readBlockletData(BlockletHeader header) throws IOException { ensureCapacity(header.getBlocklet_length()); offset = 0; int len = readIntFromStream(); @@ -120,7 +119,7 @@ public class StreamBlockletReader { compressor.rawUncompress(b, buffer); } - public void skipBlockletData(boolean reset) throws IOException { + void skipBlockletData(boolean reset) throws IOException { int len = readIntFromStream(); skip(len); pos += len; @@ -141,7 +140,7 @@ public class StreamBlockletReader { /** * find the next blocklet */ - public boolean nextBlocklet() throws IOException { + boolean nextBlocklet() throws IOException { if (pos >= limitStart) { return false; } @@ -159,15 +158,15 @@ public class StreamBlockletReader { return pos < limitEnd; } - public boolean hasNext() throws IOException { + boolean hasNext() throws IOException { return rowIndex < rowNums; } - public void nextRow() { + void nextRow() { rowIndex++; } - public int readIntFromStream() throws IOException { + int readIntFromStream() throws IOException { int ch1 = in.read(); int ch2 = in.read(); int ch3 = in.read(); @@ -183,7 +182,7 @@ public class StreamBlockletReader { * @return <code>true</code> if reading data successfully, or * <code>false</code> if there is no more data because the end of the stream has been reached. */ - public boolean readBytesFromStream(byte[] b, int offset, int len) throws IOException { + boolean readBytesFromStream(byte[] b, int offset, int len) throws IOException { int readLen = in.read(b, offset, len); if (readLen < 0) { return false; @@ -196,24 +195,24 @@ public class StreamBlockletReader { } } - public boolean readBoolean() { + boolean readBoolean() { return (buffer[offset++]) != 0; } - public short readShort() { + short readShort() { short v = (short) ((buffer[offset + 1] & 255) + ((buffer[offset]) << 8)); offset += 2; return v; } - public byte[] copy(int len) { + byte[] copy(int len) { byte[] b = new byte[len]; System.arraycopy(buffer, offset, b, 0, len); return b; } - public int readInt() { + int readInt() { int v = ((buffer[offset + 3] & 255) + ((buffer[offset + 2] & 255) << 8) + ((buffer[offset + 1] & 255) << 16) + @@ -222,7 +221,7 @@ public class StreamBlockletReader { return v; } - public long readLong() { + long readLong() { long v = ((long)(buffer[offset + 7] & 255)) + ((long) (buffer[offset + 6] & 255) << 8) + ((long) (buffer[offset + 5] & 255) << 16) + @@ -235,26 +234,26 @@ public class StreamBlockletReader { return v; } - public double readDouble() { + double readDouble() { return Double.longBitsToDouble(readLong()); } - public byte[] readBytes(int len) { + byte[] readBytes(int len) { byte[] b = new byte[len]; System.arraycopy(buffer, offset, b, 0, len); offset += len; return b; } - public void skipBytes(int len) { + void skipBytes(int len) { offset += len; } - public int getRowNums() { + int getRowNums() { return rowNums; } - public void close() { + void close() { CarbonUtil.closeStreams(in); } }