http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java new file mode 100644 index 0000000..b10bc8b --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.streaming; + +import java.io.IOException; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.scan.complextypes.ArrayQueryType; +import org.apache.carbondata.core.scan.complextypes.PrimitiveQueryType; +import org.apache.carbondata.core.scan.complextypes.StructQueryType; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonStorePath; + +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + +/** + * Stream input format + */ +public class CarbonStreamInputFormat extends FileInputFormat<Void, Object> { + + public static final String READ_BUFFER_SIZE = "carbon.stream.read.buffer.size"; + public static final String READ_BUFFER_SIZE_DEFAULT = "65536"; + + @Override public RecordReader<Void, Object> createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + return new CarbonStreamRecordReader(); + } + + public static GenericQueryType[] getComplexDimensions(CarbonTable carbontable, + CarbonColumn[] carbonColumns, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache) + throws IOException { + GenericQueryType[] queryTypes = new GenericQueryType[carbonColumns.length]; + for (int i = 0; i < carbonColumns.length; i++) { + if (carbonColumns[i].isComplex()) { + if (carbonColumns[i].getDataType() == DataTypes.ARRAY) { + queryTypes[i] = new ArrayQueryType(carbonColumns[i].getColName(), + carbonColumns[i].getColName(), i); + } else if (carbonColumns[i].getDataType() == DataTypes.STRUCT) { + queryTypes[i] = new StructQueryType(carbonColumns[i].getColName(), + carbonColumns[i].getColName(), i); + } else { + throw new UnsupportedOperationException( + carbonColumns[i].getDataType().getName() + " is not supported"); + } + + fillChildren(carbontable, queryTypes[i], (CarbonDimension) carbonColumns[i], i, cache); + } + } + + return queryTypes; + } + + private static void fillChildren(CarbonTable carbontable, GenericQueryType parentQueryType, + CarbonDimension dimension, int parentBlockIndex, + Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache) throws IOException { + for (int i = 0; i < dimension.getNumberOfChild(); i++) { + CarbonDimension child = dimension.getListOfChildDimensions().get(i); + DataType dataType = child.getDataType(); + GenericQueryType queryType = null; + if (dataType == DataTypes.ARRAY) { + queryType = + new ArrayQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex); + + } else if (dataType == DataTypes.STRUCT) { + queryType = + new StructQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex); + parentQueryType.addChildren(queryType); + } else { + boolean isDirectDictionary = + CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DIRECT_DICTIONARY); + DictionaryColumnUniqueIdentifier dictionarIdentifier = + new DictionaryColumnUniqueIdentifier(carbontable.getCarbonTableIdentifier(), + child.getColumnIdentifier(), child.getDataType(), + CarbonStorePath.getCarbonTablePath(carbontable.getAbsoluteTableIdentifier())); + + queryType = + new PrimitiveQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex, + child.getDataType(), 4, cache.get(dictionarIdentifier), + isDirectDictionary); + } + parentQueryType.addChildren(queryType); + if (child.getNumberOfChild() > 0) { + fillChildren(carbontable, queryType, child, parentBlockIndex, cache); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java new file mode 100644 index 0000000..1c21504 --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.streaming; + +import java.io.IOException; +import java.nio.charset.Charset; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +/** + * Stream output format + */ +public class CarbonStreamOutputFormat extends FileOutputFormat<Void, Object> { + + static final byte[] CARBON_SYNC_MARKER = + "@carbondata_sync".getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); + + public static final String CARBON_ENCODER_ROW_BUFFER_SIZE = "carbon.stream.row.buffer.size"; + + public static final int CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT = 1024; + + public static final String CARBON_STREAM_BLOCKLET_ROW_NUMS = "carbon.stream.blocklet.row.nums"; + + public static final int CARBON_STREAM_BLOCKLET_ROW_NUMS_DEFAULT = 32000; + + public static final String CARBON_STREAM_CACHE_SIZE = "carbon.stream.cache.size"; + + public static final int CARBON_STREAM_CACHE_SIZE_DEFAULT = 32 * 1024 * 1024; + + private static final String LOAD_Model = "mapreduce.output.carbon.load.model"; + + @Override public RecordWriter<Void, Object> getRecordWriter(TaskAttemptContext job) + throws IOException, InterruptedException { + return new CarbonStreamRecordWriter(job); + } + + public static void setCarbonLoadModel(Configuration hadoopConf, CarbonLoadModel carbonLoadModel) + throws IOException { + if (carbonLoadModel != null) { + hadoopConf.set(LOAD_Model, ObjectSerializationUtil.convertObjectToString(carbonLoadModel)); + } + } + + public static CarbonLoadModel getCarbonLoadModel(Configuration hadoopConf) throws IOException { + String value = hadoopConf.get(LOAD_Model); + if (value == null) { + return null; + } else { + return (CarbonLoadModel) ObjectSerializationUtil.convertStringToObject(value); + } + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java new file mode 100644 index 0000000..1ff0fa7 --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java @@ -0,0 +1,676 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.streaming; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.reader.CarbonHeaderReader; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.intf.RowImpl; +import org.apache.carbondata.core.scan.filter.intf.RowIntf; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.format.BlockletHeader; +import org.apache.carbondata.format.FileHeader; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.hadoop.util.CarbonTypeUtil; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVector; +import org.apache.spark.sql.execution.vectorized.ColumnarBatch; +import org.apache.spark.sql.types.CalendarIntervalType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DateType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.TimestampType; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * Stream record reader + */ +public class CarbonStreamRecordReader extends RecordReader<Void, Object> { + // vector reader + private boolean isVectorReader; + + // metadata + private CarbonTable carbonTable; + private CarbonColumn[] storageColumns; + private boolean[] isRequired; + private int[] measureDataTypes; + private int dimensionCount; + private int measureCount; + + // input + private FileSplit fileSplit; + private Configuration hadoopConf; + private StreamBlockletReader input; + private boolean isFirstRow = true; + private QueryModel model; + + // decode data + private BitSet allNonNull; + private boolean[] isNoDictColumn; + private DirectDictionaryGenerator[] directDictionaryGenerators; + private CacheProvider cacheProvider; + private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache; + private GenericQueryType[] queryTypes; + + // vectorized reader + private StructType outputSchema; + private ColumnarBatch columnarBatch; + private boolean isFinished = false; + + // filter + private FilterExecuter filter; + private boolean[] isFilterRequired; + private Object[] filterValues; + private RowIntf filterRow; + private int[] filterMap; + + // output + private CarbonColumn[] projection; + private boolean[] isProjectionRequired; + private int[] projectionMap; + private Object[] outputValues; + private InternalRow outputRow; + + // empty project, null filter + private boolean skipScanData; + + @Override public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + // input + if (split instanceof CarbonInputSplit) { + fileSplit = (CarbonInputSplit) split; + } else if (split instanceof CarbonMultiBlockSplit) { + fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0); + } else { + fileSplit = (FileSplit) split; + } + + // metadata + hadoopConf = context.getConfiguration(); + if (model == null) { + CarbonTableInputFormat format = new CarbonTableInputFormat<Object>(); + model = format.getQueryModel(split, context); + } + carbonTable = model.getTable(); + List<CarbonDimension> dimensions = + carbonTable.getDimensionByTableName(carbonTable.getFactTableName()); + dimensionCount = dimensions.size(); + List<CarbonMeasure> measures = + carbonTable.getMeasureByTableName(carbonTable.getFactTableName()); + measureCount = measures.size(); + List<CarbonColumn> carbonColumnList = + carbonTable.getStreamStorageOrderColumn(carbonTable.getFactTableName()); + storageColumns = carbonColumnList.toArray(new CarbonColumn[carbonColumnList.size()]); + isNoDictColumn = CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns); + directDictionaryGenerators = new DirectDictionaryGenerator[storageColumns.length]; + for (int i = 0; i < storageColumns.length; i++) { + if (storageColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) { + directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(storageColumns[i].getDataType()); + } + } + measureDataTypes = new int[measureCount]; + for (int i = 0; i < measureCount; i++) { + measureDataTypes[i] = storageColumns[dimensionCount + i].getDataType().getId(); + } + + // decode data + allNonNull = new BitSet(storageColumns.length); + projection = model.getProjectionColumns(); + + isRequired = new boolean[storageColumns.length]; + boolean[] isFiltlerDimensions = model.getIsFilterDimensions(); + boolean[] isFiltlerMeasures = model.getIsFilterMeasures(); + isFilterRequired = new boolean[storageColumns.length]; + filterMap = new int[storageColumns.length]; + for (int i = 0; i < storageColumns.length; i++) { + if (storageColumns[i].isDimension()) { + if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) { + isRequired[i] = true; + isFilterRequired[i] = true; + filterMap[i] = storageColumns[i].getOrdinal(); + } + } else { + if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) { + isRequired[i] = true; + isFilterRequired[i] = true; + filterMap[i] = carbonTable.getDimensionOrdinalMax() + storageColumns[i].getOrdinal(); + } + } + } + + isProjectionRequired = new boolean[storageColumns.length]; + projectionMap = new int[storageColumns.length]; + for (int i = 0; i < storageColumns.length; i++) { + for (int j = 0; j < projection.length; j++) { + if (storageColumns[i].getColName().equals(projection[j].getColName())) { + isRequired[i] = true; + isProjectionRequired[i] = true; + projectionMap[i] = j; + break; + } + } + } + + // initialize filter + if (null != model.getFilterExpressionResolverTree()) { + initializeFilter(); + } else if (projection.length == 0) { + skipScanData = true; + } + + } + + private void initializeFilter() { + + List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil + .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getFactTableName()), + carbonTable.getMeasureByTableName(carbonTable.getFactTableName())); + int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()]; + for (int i = 0; i < dimLensWithComplex.length; i++) { + dimLensWithComplex[i] = Integer.MAX_VALUE; + } + + int[] dictionaryColumnCardinality = + CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList); + SegmentProperties segmentProperties = + new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality); + Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>(); + + FilterResolverIntf resolverIntf = model.getFilterExpressionResolverTree(); + filter = FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties, + complexDimensionInfoMap); + // for row filter, we need update column index + FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(), + carbonTable.getDimensionOrdinalMax()); + + } + + public void setQueryModel(QueryModel model) { + this.model = model; + } + + private byte[] getSyncMarker(String filePath) throws IOException { + CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath); + FileHeader header = headerReader.readHeader(); + return header.getSync_marker(); + } + + private void initializeAtFirstRow() throws IOException { + filterValues = new Object[carbonTable.getDimensionOrdinalMax() + measureCount]; + filterRow = new RowImpl(); + filterRow.setValues(filterValues); + + outputValues = new Object[projection.length]; + outputRow = new GenericInternalRow(outputValues); + + Path file = fileSplit.getPath(); + + byte[] syncMarker = getSyncMarker(file.toUri().getPath()); + + FileSystem fs = file.getFileSystem(hadoopConf); + + int bufferSize = Integer.parseInt(hadoopConf.get(CarbonStreamInputFormat.READ_BUFFER_SIZE, + CarbonStreamInputFormat.READ_BUFFER_SIZE_DEFAULT)); + + FSDataInputStream fileIn = fs.open(file, bufferSize); + fileIn.seek(fileSplit.getStart()); + input = new StreamBlockletReader(syncMarker, fileIn, fileSplit.getLength(), + fileSplit.getStart() == 0); + + cacheProvider = CacheProvider.getInstance(); + cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, carbonTable.getStorePath()); + queryTypes = CarbonStreamInputFormat.getComplexDimensions(carbonTable, storageColumns, cache); + + outputSchema = new StructType(CarbonTypeUtil.convertCarbonSchemaToSparkSchema(projection)); + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + if (isFirstRow) { + isFirstRow = false; + initializeAtFirstRow(); + } + if (isFinished) { + return false; + } + + if (isVectorReader) { + return nextColumnarBatch(); + } + + return nextRow(); + } + + /** + * for vector reader, check next columnar batch + */ + private boolean nextColumnarBatch() throws IOException { + boolean hasNext; + boolean scanMore = false; + do { + // move to the next blocklet + hasNext = input.nextBlocklet(); + if (hasNext) { + // read blocklet header + BlockletHeader header = input.readBlockletHeader(); + if (isScanRequired(header)) { + scanMore = !scanBlockletAndFillVector(header); + } else { + input.skipBlockletData(true); + scanMore = true; + } + } else { + isFinished = true; + scanMore = false; + } + } while (scanMore); + return hasNext; + } + + /** + * check next Row + */ + private boolean nextRow() throws IOException { + // read row one by one + try { + boolean hasNext; + boolean scanMore = false; + do { + hasNext = input.hasNext(); + if (hasNext) { + if (skipScanData) { + input.nextRow(); + scanMore = false; + } else { + readRowFromStream(); + if (null != filter) { + scanMore = !filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax()); + } else { + scanMore = false; + } + } + } else { + if (input.nextBlocklet()) { + BlockletHeader header = input.readBlockletHeader(); + if (isScanRequired(header)) { + if (skipScanData) { + input.skipBlockletData(false); + } else { + input.readBlockletData(header); + } + } else { + input.skipBlockletData(true); + } + scanMore = true; + } else { + isFinished = true; + scanMore = false; + } + } + } while (scanMore); + return hasNext; + } catch (FilterUnsupportedException e) { + throw new IOException("Failed to filter row in detail reader", e); + } + } + + @Override public Void getCurrentKey() throws IOException, InterruptedException { + return null; + } + + @Override public Object getCurrentValue() throws IOException, InterruptedException { + if (isVectorReader) { + return columnarBatch; + } + return outputRow; + } + + private boolean isScanRequired(BlockletHeader header) { + // TODO require to implement min-max index + if (null == filter) { + return true; + } + return true; + } + + private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException { + // if filter is null and output projection is empty, use the row number of blocklet header + if (skipScanData) { + int rowNums = header.getBlocklet_info().getNum_rows(); + columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, rowNums); + columnarBatch.setNumRows(rowNums); + input.skipBlockletData(true); + return rowNums > 0; + } + + input.readBlockletData(header); + columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, input.getRowNums()); + int rowNum = 0; + if (null == filter) { + while (input.hasNext()) { + readRowFromStream(); + putRowToColumnBatch(rowNum++); + } + } else { + try { + while (input.hasNext()) { + readRowFromStream(); + if (filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax())) { + putRowToColumnBatch(rowNum++); + } + } + } catch (FilterUnsupportedException e) { + throw new IOException("Failed to filter row in vector reader", e); + } + } + columnarBatch.setNumRows(rowNum); + return rowNum > 0; + } + + private void readRowFromStream() { + input.nextRow(); + short nullLen = input.readShort(); + BitSet nullBitSet = allNonNull; + if (nullLen > 0) { + nullBitSet = BitSet.valueOf(input.readBytes(nullLen)); + } + int colCount = 0; + // primitive type dimension + for (; colCount < isNoDictColumn.length; colCount++) { + if (nullBitSet.get(colCount)) { + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = null; + } + } else { + if (isNoDictColumn[colCount]) { + int v = input.readShort(); + if (isRequired[colCount]) { + byte[] b = input.readBytes(v); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = b; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = + DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(b, + storageColumns[colCount].getDataType()); + } + } else { + input.skipBytes(v); + } + } else if (null != directDictionaryGenerators[colCount]) { + if (isRequired[colCount]) { + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = input.copy(4); + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = + directDictionaryGenerators[colCount].getValueFromSurrogate(input.readInt()); + } else { + input.skipBytes(4); + } + } else { + input.skipBytes(4); + } + } else { + if (isRequired[colCount]) { + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = input.copy(4); + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = input.readInt(); + } else { + input.skipBytes(4); + } + } else { + input.skipBytes(4); + } + } + } + } + // complex type dimension + for (; colCount < dimensionCount; colCount++) { + if (nullBitSet.get(colCount)) { + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = null; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = null; + } + } else { + short v = input.readShort(); + if (isRequired[colCount]) { + byte[] b = input.readBytes(v); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = b; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = queryTypes[colCount] + .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(b)); + } + } else { + input.skipBytes(v); + } + } + } + // measure + int dataType; + for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) { + if (nullBitSet.get(colCount)) { + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = null; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = null; + } + } else { + dataType = measureDataTypes[msrCount]; + if (dataType == DataTypes.BOOLEAN_TYPE_ID) { + if (isRequired[colCount]) { + boolean v = input.readBoolean(); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = v; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = v; + } + } else { + input.skipBytes(1); + } + } else if (dataType == DataTypes.SHORT_TYPE_ID) { + if (isRequired[colCount]) { + short v = input.readShort(); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = v; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = v; + } + } else { + input.skipBytes(2); + } + } else if (dataType == DataTypes.INT_TYPE_ID) { + if (isRequired[colCount]) { + int v = input.readInt(); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = v; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = v; + } + } else { + input.skipBytes(4); + } + } else if (dataType == DataTypes.LONG_TYPE_ID) { + if (isRequired[colCount]) { + long v = input.readLong(); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = v; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = v; + } + } else { + input.skipBytes(8); + } + } else if (dataType == DataTypes.DOUBLE_TYPE_ID) { + if (isRequired[colCount]) { + double v = input.readDouble(); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = v; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = v; + } + } else { + input.skipBytes(8); + } + } else if (dataType == DataTypes.DECIMAL_TYPE_ID) { + int len = input.readShort(); + if (isRequired[colCount]) { + BigDecimal v = DataTypeUtil.byteToBigDecimal(input.readBytes(len)); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = v; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = v; + } + } else { + input.skipBytes(len); + } + } + } + } + } + + private void putRowToColumnBatch(int rowId) { + for (int i = 0; i < projection.length; i++) { + Object value = outputValues[i]; + ColumnVector col = columnarBatch.column(i); + DataType t = col.dataType(); + if (null == value) { + col.putNull(rowId); + } else { + if (t == org.apache.spark.sql.types.DataTypes.BooleanType) { + col.putBoolean(rowId, (boolean)value); + } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) { + col.putByte(rowId, (byte) value); + } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) { + col.putShort(rowId, (short) value); + } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) { + col.putInt(rowId, (int) value); + } else if (t == org.apache.spark.sql.types.DataTypes.LongType) { + col.putLong(rowId, (long) value); + } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) { + col.putFloat(rowId, (float) value); + } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) { + col.putDouble(rowId, (double) value); + } else if (t == org.apache.spark.sql.types.DataTypes.StringType) { + UTF8String v = (UTF8String) value; + col.putByteArray(rowId, v.getBytes()); + } else if (t instanceof DecimalType) { + DecimalType dt = (DecimalType)t; + Decimal d = (Decimal) value; + if (dt.precision() <= Decimal.MAX_INT_DIGITS()) { + col.putInt(rowId, (int)d.toUnscaledLong()); + } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) { + col.putLong(rowId, d.toUnscaledLong()); + } else { + final BigInteger integer = d.toJavaBigDecimal().unscaledValue(); + byte[] bytes = integer.toByteArray(); + col.putByteArray(rowId, bytes, 0, bytes.length); + } + } else if (t instanceof CalendarIntervalType) { + CalendarInterval c = (CalendarInterval) value; + col.getChildColumn(0).putInt(rowId, c.months); + col.getChildColumn(1).putLong(rowId, c.microseconds); + } else if (t instanceof DateType) { + col.putInt(rowId, (int) value); + } else if (t instanceof TimestampType) { + col.putLong(rowId, (long) value); + } + } + } + } + + @Override public float getProgress() throws IOException, InterruptedException { + return 0; + } + + public void setVectorReader(boolean isVectorReader) { + this.isVectorReader = isVectorReader; + } + + @Override public void close() throws IOException { + if (null != input) { + input.close(); + } + if (null != columnarBatch) { + columnarBatch.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java new file mode 100644 index 0000000..8d7a2e3 --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.streaming; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.util.CarbonMetadataUtil; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.util.path.CarbonStorePath; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.format.FileHeader; +import org.apache.carbondata.processing.loading.BadRecordsLogger; +import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; +import org.apache.carbondata.processing.loading.DataField; +import org.apache.carbondata.processing.loading.DataLoadProcessBuilder; +import org.apache.carbondata.processing.loading.converter.RowConverter; +import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.processing.loading.parser.RowParser; +import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl; +import org.apache.carbondata.processing.loading.steps.DataConverterProcessorStepImpl; +import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskID; + +/** + * Stream record writer + */ +public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(CarbonStreamRecordWriter.class.getName()); + + // basic info + private Configuration hadoopConf; + private CarbonDataLoadConfiguration configuration; + private CarbonTable carbonTable; + private int maxRowNums; + private int maxCacheSize; + + // parser and converter + private RowParser rowParser; + private RowConverter converter; + private CarbonRow currentRow = new CarbonRow(null); + + // encoder + private DataField[] dataFields; + private BitSet nullBitSet; + private boolean[] isNoDictionaryDimensionColumn; + private int dimensionWithComplexCount; + private int measureCount; + private int[] measureDataTypes; + private StreamBlockletWriter output = null; + + // data write + private String segmentDir; + private String fileName; + private DataOutputStream outputStream; + private boolean isFirstRow = true; + private boolean hasException = false; + + CarbonStreamRecordWriter(TaskAttemptContext job) throws IOException { + initialize(job); + } + + private void initialize(TaskAttemptContext job) throws IOException { + // set basic information + hadoopConf = job.getConfiguration(); + CarbonLoadModel carbonLoadModel = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf); + if (carbonLoadModel == null) { + throw new IOException( + "CarbonStreamRecordWriter require configuration: mapreduce.output.carbon.load.model"); + } + carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(); + int taskNo = TaskID.forName(hadoopConf.get("mapred.tip.id")).getId(); + carbonLoadModel.setTaskNo("" + taskNo); + configuration = DataLoadProcessBuilder.createConfiguration(carbonLoadModel); + maxRowNums = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS, + CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS_DEFAULT) - 1; + maxCacheSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE, + CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE_DEFAULT); + + CarbonTablePath tablePath = + CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier()); + segmentDir = tablePath.getSegmentDir("0", carbonLoadModel.getSegmentId()); + fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0"); + } + + private void initializeAtFirstRow() throws IOException, InterruptedException { + isFirstRow = false; + + // initialize metadata + isNoDictionaryDimensionColumn = + CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()); + dimensionWithComplexCount = configuration.getDimensionCount(); + measureCount = configuration.getMeasureCount(); + dataFields = configuration.getDataFields(); + measureDataTypes = new int[measureCount]; + for (int i = 0; i < measureCount; i++) { + measureDataTypes[i] = + dataFields[dimensionWithComplexCount + i].getColumn().getDataType().getId(); + } + + // initialize parser and converter + rowParser = new RowParserImpl(dataFields, configuration); + BadRecordsLogger badRecordLogger = + DataConverterProcessorStepImpl.createBadRecordLogger(configuration); + converter = new RowConverterImpl(configuration.getDataFields(), configuration, badRecordLogger); + configuration.setCardinalityFinder(converter); + converter.initialize(); + + // initialize encoder + nullBitSet = new BitSet(dataFields.length); + int rowBufferSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE, + CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT); + output = new StreamBlockletWriter(maxCacheSize, maxRowNums, rowBufferSize); + + // initialize data writer + String filePath = segmentDir + File.separator + fileName; + FileFactory.FileType fileType = FileFactory.getFileType(filePath); + CarbonFile carbonFile = FileFactory.getCarbonFile(filePath, fileType); + if (carbonFile.exists()) { + // if the file is existed, use the append api + outputStream = FileFactory.getDataOutputStreamUsingAppend(filePath, fileType); + } else { + // IF the file is not existed, use the create api + outputStream = FileFactory.getDataOutputStream(filePath, fileType); + writeFileHeader(); + } + } + + @Override public void write(Void key, Object value) throws IOException, InterruptedException { + if (isFirstRow) { + initializeAtFirstRow(); + } + + // parse and convert row + currentRow.setData(rowParser.parseRow((Object[]) value)); + converter.convert(currentRow); + + // null bit set + nullBitSet.clear(); + for (int i = 0; i < dataFields.length; i++) { + if (null == currentRow.getObject(i)) { + nullBitSet.set(i); + } + } + output.nextRow(); + byte[] b = nullBitSet.toByteArray(); + output.writeShort(b.length); + if (b.length > 0) { + output.writeBytes(b); + } + int dimCount = 0; + Object columnValue; + + // primitive type dimension + for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) { + columnValue = currentRow.getObject(dimCount); + if (null != columnValue) { + if (isNoDictionaryDimensionColumn[dimCount]) { + byte[] col = (byte[]) columnValue; + output.writeShort(col.length); + output.writeBytes(col); + } else { + output.writeInt((int) columnValue); + } + } + } + // complex type dimension + for (; dimCount < dimensionWithComplexCount; dimCount++) { + columnValue = currentRow.getObject(dimCount); + if (null != columnValue) { + byte[] col = (byte[]) columnValue; + output.writeShort(col.length); + output.writeBytes(col); + } + } + // measure + int dataType; + for (int msrCount = 0; msrCount < measureCount; msrCount++) { + columnValue = currentRow.getObject(dimCount + msrCount); + if (null != columnValue) { + dataType = measureDataTypes[msrCount]; + if (dataType == DataTypes.BOOLEAN_TYPE_ID) { + output.writeBoolean((boolean) columnValue); + } else if (dataType == DataTypes.SHORT_TYPE_ID) { + output.writeShort((short) columnValue); + } else if (dataType == DataTypes.INT_TYPE_ID) { + output.writeInt((int) columnValue); + } else if (dataType == DataTypes.LONG_TYPE_ID) { + output.writeLong((long) columnValue); + } else if (dataType == DataTypes.DOUBLE_TYPE_ID) { + output.writeDouble((double) columnValue); + } else if (dataType == DataTypes.DECIMAL_TYPE_ID) { + BigDecimal val = (BigDecimal) columnValue; + byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val); + output.writeShort(bigDecimalInBytes.length); + output.writeBytes(bigDecimalInBytes); + } else { + String msg = + "unsupported data type:" + dataFields[dimCount + msrCount].getColumn().getDataType() + .getName(); + LOGGER.error(msg); + throw new IOException(msg); + } + } + } + + if (output.isFull()) { + appendBlockletToDataFile(); + } + } + + private void writeFileHeader() throws IOException { + List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil + .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getFactTableName()), + carbonTable.getMeasureByTableName(carbonTable.getFactTableName())); + int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()]; + for (int i = 0; i < dimLensWithComplex.length; i++) { + dimLensWithComplex[i] = Integer.MAX_VALUE; + } + int[] dictionaryColumnCardinality = + CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList); + List<Integer> cardinality = new ArrayList<>(); + List<org.apache.carbondata.format.ColumnSchema> columnSchemaList = AbstractFactDataWriter + .getColumnSchemaListAndCardinality(cardinality, dictionaryColumnCardinality, + wrapperColumnSchemaList); + FileHeader fileHeader = + CarbonMetadataUtil.getFileHeader(true, columnSchemaList, System.currentTimeMillis()); + fileHeader.setIs_footer_present(false); + fileHeader.setIs_splitable(true); + fileHeader.setSync_marker(CarbonStreamOutputFormat.CARBON_SYNC_MARKER); + outputStream.write(CarbonUtil.getByteArray(fileHeader)); + } + + /** + * write a blocklet to file + */ + private void appendBlockletToDataFile() throws IOException { + if (output.getRowIndex() == -1) { + return; + } + output.apppendBlocklet(outputStream); + outputStream.flush(); + // reset data + output.reset(); + } + + @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { + try { + // append remain buffer data + if (!hasException) { + appendBlockletToDataFile(); + converter.finish(); + } + } finally { + // close resource + CarbonUtil.closeStreams(outputStream); + output.close(); + } + } + + public String getSegmentDir() { + return segmentDir; + } + + public String getFileName() { + return fileName; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java new file mode 100644 index 0000000..eafb142 --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.streaming; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.format.BlockletHeader; + +/** + * stream blocklet reader + */ +public class StreamBlockletReader { + + private byte[] buffer; + private int offset; + private final byte[] syncMarker; + private final byte[] syncBuffer; + private final int syncLen; + private long pos = 0; + private final InputStream in; + private final long limitStart; + private final long limitEnd; + private boolean isAlreadySync = false; + private Compressor compressor = CompressorFactory.getInstance().getCompressor(); + private int rowNums = 0; + private int rowIndex = 0; + private boolean isHeaderPresent; + + StreamBlockletReader(byte[] syncMarker, InputStream in, long limit, boolean isHeaderPresent) { + this.syncMarker = syncMarker; + this.syncLen = syncMarker.length; + this.syncBuffer = new byte[syncMarker.length]; + this.in = in; + this.limitStart = limit; + this.limitEnd = limitStart + syncLen; + this.isHeaderPresent = isHeaderPresent; + } + + private void ensureCapacity(int capacity) { + if (buffer == null || capacity > buffer.length) { + buffer = new byte[capacity]; + } + } + + /** + * find the first position of sync_marker in input stream + */ + private boolean sync() throws IOException { + int len = in.read(syncBuffer); + if (len < syncLen) { + return false; + } + pos += syncLen; + boolean skipHeader = false; + for (int i = 0; i < limitStart; i++) { + int j = 0; + for (; j < syncLen; j++) { + if (syncMarker[j] != syncBuffer[(i + j) % syncLen]) break; + } + if (syncLen == j) { + if (isHeaderPresent) { + if (skipHeader) { + return true; + } else { + skipHeader = true; + } + } else { + return true; + } + } + int value = in.read(); + if (-1 == value) { + return false; + } + syncBuffer[i % syncLen] = (byte) value; + pos++; + } + return false; + } + + BlockletHeader readBlockletHeader() throws IOException { + int len = readIntFromStream(); + byte[] b = new byte[len]; + readBytesFromStream(b); + BlockletHeader header = CarbonUtil.readBlockletHeader(b); + rowNums = header.getBlocklet_info().getNum_rows(); + rowIndex = 0; + return header; + } + + void readBlockletData(BlockletHeader header) throws IOException { + ensureCapacity(header.getBlocklet_length()); + offset = 0; + int len = readIntFromStream(); + byte[] b = new byte[len]; + readBytesFromStream(b); + compressor.rawUncompress(b, buffer); + } + + void skipBlockletData(boolean reset) throws IOException { + int len = readIntFromStream(); + skip(len); + pos += len; + if (reset) { + this.rowNums = 0; + this.rowIndex = 0; + } + } + + private void skip(int len) throws IOException { + long remaining = len; + do { + long skipLen = in.skip(remaining); + remaining -= skipLen; + } while (remaining > 0); + } + + /** + * find the next blocklet + */ + boolean nextBlocklet() throws IOException { + if (pos >= limitStart) { + return false; + } + if (isAlreadySync) { + int v = in.read(syncBuffer); + if (v < syncLen) { + return false; + } + pos += syncLen; + } else { + isAlreadySync = true; + if (!sync()) { + return false; + } + } + + return pos < limitEnd; + } + + boolean hasNext() throws IOException { + return rowIndex < rowNums; + } + + void nextRow() { + rowIndex++; + } + + int readIntFromStream() throws IOException { + int ch1 = in.read(); + int ch2 = in.read(); + int ch3 = in.read(); + int ch4 = in.read(); + if ((ch1 | ch2 | ch3 | ch4) < 0) throw new EOFException(); + pos += 4; + return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); + } + + void readBytesFromStream(byte[] b) throws IOException { + int len = in.read(b, 0, b.length); + if (len < b.length) { + throw new EOFException(); + } + pos += b.length; + } + + boolean readBoolean() { + return (buffer[offset++]) != 0; + } + + short readShort() { + short v = (short) ((buffer[offset + 1] & 255) + + ((buffer[offset]) << 8)); + offset += 2; + return v; + } + + byte[] copy(int len) { + byte[] b = new byte[len]; + System.arraycopy(buffer, offset, b, 0, len); + return b; + } + + int readInt() { + int v = ((buffer[offset + 3] & 255) + + ((buffer[offset + 2] & 255) << 8) + + ((buffer[offset + 1] & 255) << 16) + + ((buffer[offset]) << 24)); + offset += 4; + return v; + } + + long readLong() { + long v = ((long)(buffer[offset + 7] & 255)) + + ((long) (buffer[offset + 6] & 255) << 8) + + ((long) (buffer[offset + 5] & 255) << 16) + + ((long) (buffer[offset + 4] & 255) << 24) + + ((long) (buffer[offset + 3] & 255) << 32) + + ((long) (buffer[offset + 2] & 255) << 40) + + ((long) (buffer[offset + 1] & 255) << 48) + + ((long) (buffer[offset]) << 56); + offset += 8; + return v; + } + + double readDouble() { + return Double.longBitsToDouble(readLong()); + } + + byte[] readBytes(int len) { + byte[] b = new byte[len]; + System.arraycopy(buffer, offset, b, 0, len); + offset += len; + return b; + } + + void skipBytes(int len) { + offset += len; + } + + int getRowNums() { + return rowNums; + } + + void close() { + CarbonUtil.closeStreams(in); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletWriter.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletWriter.java new file mode 100644 index 0000000..a0328b3 --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletWriter.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.streaming; + +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.format.BlockletHeader; +import org.apache.carbondata.format.BlockletInfo; +import org.apache.carbondata.format.MutationType; + +/** + * stream blocklet writer + */ +public class StreamBlockletWriter { + private byte[] buffer; + private int maxSize; + private int maxRowNum; + private int rowSize; + private int count = 0; + private int rowIndex = -1; + private Compressor compressor = CompressorFactory.getInstance().getCompressor(); + + StreamBlockletWriter(int maxSize, int maxRowNum, int rowSize) { + buffer = new byte[maxSize]; + this.maxSize = maxSize; + this.maxRowNum = maxRowNum; + this.rowSize = rowSize; + } + + private void ensureCapacity(int space) { + int newcount = space + count; + if (newcount > buffer.length) { + byte[] newbuf = new byte[Math.max(newcount, buffer.length + rowSize)]; + System.arraycopy(buffer, 0, newbuf, 0, count); + buffer = newbuf; + } + } + + void reset() { + count = 0; + rowIndex = -1; + } + + byte[] getBytes() { + return buffer; + } + + int getCount() { + return count; + } + + int getRowIndex() { + return rowIndex; + } + + void nextRow() { + rowIndex++; + } + + boolean isFull() { + return rowIndex == maxRowNum || count >= maxSize; + } + + void writeBoolean(boolean val) { + ensureCapacity(1); + buffer[count] = (byte) (val ? 1 : 0); + count += 1; + } + + void writeShort(int val) { + ensureCapacity(2); + buffer[count + 1] = (byte) (val); + buffer[count] = (byte) (val >>> 8); + count += 2; + } + + void writeInt(int val) { + ensureCapacity(4); + buffer[count + 3] = (byte) (val); + buffer[count + 2] = (byte) (val >>> 8); + buffer[count + 1] = (byte) (val >>> 16); + buffer[count] = (byte) (val >>> 24); + count += 4; + } + + void writeLong(long val) { + ensureCapacity(8); + buffer[count + 7] = (byte) (val); + buffer[count + 6] = (byte) (val >>> 8); + buffer[count + 5] = (byte) (val >>> 16); + buffer[count + 4] = (byte) (val >>> 24); + buffer[count + 3] = (byte) (val >>> 32); + buffer[count + 2] = (byte) (val >>> 40); + buffer[count + 1] = (byte) (val >>> 48); + buffer[count] = (byte) (val >>> 56); + count += 8; + } + + void writeDouble(double val) { + writeLong(Double.doubleToLongBits(val)); + } + + void writeBytes(byte[] b) { + writeBytes(b, 0, b.length); + } + + void writeBytes(byte[] b, int off, int len) { + ensureCapacity(len); + System.arraycopy(b, off, buffer, count, len); + count += len; + } + + void apppendBlocklet(DataOutputStream outputStream) throws IOException { + outputStream.write(CarbonStreamOutputFormat.CARBON_SYNC_MARKER); + + BlockletInfo blockletInfo = new BlockletInfo(); + blockletInfo.setNum_rows(getRowIndex() + 1); + BlockletHeader blockletHeader = new BlockletHeader(); + blockletHeader.setBlocklet_length(getCount()); + blockletHeader.setMutation(MutationType.INSERT); + blockletHeader.setBlocklet_info(blockletInfo); + byte[] headerBytes = CarbonUtil.getByteArray(blockletHeader); + outputStream.writeInt(headerBytes.length); + outputStream.write(headerBytes); + + byte[] compressed = compressor.compressByte(getBytes(), getCount()); + outputStream.writeInt(compressed.length); + outputStream.write(compressed); + } + + void close() { + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java index a559cc4..b4444be 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java @@ -109,12 +109,14 @@ public class CarbonInputFormatUtil { plan.addDimension(queryDimension); } - public static void processFilterExpression(Expression filterExpression, CarbonTable carbonTable) { + public static void processFilterExpression(Expression filterExpression, CarbonTable carbonTable, + boolean[] isFilterDimensions, boolean[] isFilterMeasures) { List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(carbonTable.getFactTableName()); List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(carbonTable.getFactTableName()); - QueryModel.processFilterExpression(filterExpression, dimensions, measures); + QueryModel.processFilterExpression(filterExpression, dimensions, measures, + isFilterDimensions, isFilterMeasures); if (null != filterExpression) { // Optimize Filter Expression and fit RANGE filters is conditions apply. http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonTypeUtil.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonTypeUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonTypeUtil.java new file mode 100644 index 0000000..395015e --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonTypeUtil.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.util; + +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; + +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.StructField; + +public class CarbonTypeUtil { + + public static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType( + DataType carbonDataType) { + if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING) { + return DataTypes.StringType; + } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT) { + return DataTypes.ShortType; + } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT) { + return DataTypes.IntegerType; + } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) { + return DataTypes.LongType; + } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE) { + return DataTypes.DoubleType; + } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN) { + return DataTypes.BooleanType; + } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(carbonDataType)) { + return DataTypes.createDecimalType(); + } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.TIMESTAMP) { + return DataTypes.TimestampType; + } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DATE) { + return DataTypes.DateType; + } else { + return null; + } + } + + public static StructField[] convertCarbonSchemaToSparkSchema(CarbonColumn[] carbonColumns) { + StructField[] fields = new StructField[carbonColumns.length]; + for (int i = 0; i < carbonColumns.length; i++) { + CarbonColumn carbonColumn = carbonColumns[i]; + if (carbonColumn.isDimension()) { + if (carbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY)) { + DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(carbonColumn.getDataType()); + fields[i] = new StructField(carbonColumn.getColName(), + CarbonTypeUtil.convertCarbonToSparkDataType(generator.getReturnType()), true, null); + } else if (!carbonColumn.hasEncoding(Encoding.DICTIONARY)) { + fields[i] = new StructField(carbonColumn.getColName(), + CarbonTypeUtil.convertCarbonToSparkDataType(carbonColumn.getDataType()), true, null); + } else if (carbonColumn.isComplex()) { + fields[i] = new StructField(carbonColumn.getColName(), + CarbonTypeUtil.convertCarbonToSparkDataType(carbonColumn.getDataType()), true, null); + } else { + fields[i] = new StructField(carbonColumn.getColName(), CarbonTypeUtil + .convertCarbonToSparkDataType( + org.apache.carbondata.core.metadata.datatype.DataTypes.INT), true, null); + } + } else if (carbonColumn.isMeasure()) { + DataType dataType = carbonColumn.getDataType(); + if (dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN + || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT + || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT + || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) { + fields[i] = new StructField(carbonColumn.getColName(), + CarbonTypeUtil.convertCarbonToSparkDataType(dataType), true, null); + } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(dataType)) { + CarbonMeasure measure = (CarbonMeasure) carbonColumn; + fields[i] = new StructField(carbonColumn.getColName(), + new DecimalType(measure.getPrecision(), measure.getScale()), true, null); + } else { + fields[i] = new StructField(carbonColumn.getColName(), CarbonTypeUtil + .convertCarbonToSparkDataType( + org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE), true, null); + } + } + } + return fields; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java index ea90bbf..29d8d03 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java @@ -116,6 +116,45 @@ public class StoreCreator { return absoluteTableIdentifier; } + public static CarbonLoadModel buildCarbonLoadModel(CarbonTable table, String factFilePath) { + CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table); + CarbonLoadModel loadModel = new CarbonLoadModel(); + loadModel.setCarbonDataLoadSchema(schema); + loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName()); + loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); + loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); + loadModel.setFactFilePath(factFilePath); + loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>()); + loadModel.setStorePath(absoluteTableIdentifier.getStorePath()); + loadModel.setDateFormat(null); + loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS)); + loadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)); + loadModel + .setSerializationNullFormat( + TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," + "\\N"); + loadModel + .setBadRecordsLoggerEnable( + TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + "false"); + loadModel + .setBadRecordsAction( + TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + "FORCE"); + loadModel + .setIsEmptyDataBadRecord( + DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + "false"); + loadModel.setCsvHeader("ID,date,country,name,phonetype,serialname,salary"); + loadModel.setCsvHeaderColumns(loadModel.getCsvHeader().split(",")); + loadModel.setTaskNo("0"); + loadModel.setSegmentId("0"); + loadModel.setPartitionId("0"); + loadModel.setFactTimeStamp(System.currentTimeMillis()); + loadModel.setMaxColumns("10"); + return loadModel; + } + /** * Create store without any restructure */ @@ -131,42 +170,7 @@ public class StoreCreator { CarbonTable table = createTable(); writeDictionary(factFilePath, table); - CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table); - CarbonLoadModel loadModel = new CarbonLoadModel(); - String partitionId = "0"; - loadModel.setCarbonDataLoadSchema(schema); - loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName()); - loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); - loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); - loadModel.setFactFilePath(factFilePath); - loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>()); - loadModel.setStorePath(absoluteTableIdentifier.getStorePath()); - loadModel.setDateFormat(null); - loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, - CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS)); - loadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_DATE_FORMAT, - CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)); - loadModel - .setSerializationNullFormat( - TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," + "\\N"); - loadModel - .setBadRecordsLoggerEnable( - TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + "false"); - loadModel - .setBadRecordsAction( - TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + "FORCE"); - loadModel - .setIsEmptyDataBadRecord( - DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + "false"); - loadModel.setCsvHeader("ID,date,country,name,phonetype,serialname,salary"); - loadModel.setCsvHeaderColumns(loadModel.getCsvHeader().split(",")); - loadModel.setTaskNo("0"); - loadModel.setSegmentId("0"); - loadModel.setPartitionId("0"); - loadModel.setFactTimeStamp(System.currentTimeMillis()); - loadModel.setMaxColumns("10"); + CarbonLoadModel loadModel = buildCarbonLoadModel(table, factFilePath); executeGraph(loadModel, absoluteTableIdentifier.getStorePath()); @@ -176,7 +180,7 @@ public class StoreCreator { } - private static CarbonTable createTable() throws IOException { + public static CarbonTable createTable() throws IOException { TableInfo tableInfo = new TableInfo(); tableInfo.setStorePath(absoluteTableIdentifier.getStorePath()); tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java index 2e840c0..4cbc692 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java @@ -135,7 +135,7 @@ public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable> QueryModel.createModel(identifier, queryPlan, carbonTable, new DataTypeConverterImpl()); // set the filter to the query model in order to filter blocklet before scan Expression filter = getFilterPredicates(configuration); - CarbonInputFormatUtil.processFilterExpression(filter, carbonTable); + CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null); FilterResolverIntf filterIntf = CarbonInputFormatUtil.resolveFilter(filter, identifier, tableProvider); queryModel.setFilterExpressionResolverTree(filterIntf); http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java index f129474..5e3e5b7 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java @@ -42,6 +42,7 @@ import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.hadoop.AbstractRecordReader; import org.apache.carbondata.hadoop.CarbonInputSplit; import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.util.CarbonTypeUtil; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index fc34127..7ec6b7b 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -21,6 +21,8 @@ import java.text.SimpleDateFormat import java.util.{ArrayList, Date, List} import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import scala.util.Random import org.apache.hadoop.conf.Configuration @@ -38,9 +40,11 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo import org.apache.carbondata.core.scan.expression.Expression import org.apache.carbondata.core.scan.model.QueryModel import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder} -import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, TaskMetricsMap} +import org.apache.carbondata.core.statusmanager.FileFormat +import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, DataTypeUtil, TaskMetricsMap} import org.apache.carbondata.hadoop._ import org.apache.carbondata.hadoop.api.CarbonTableInputFormat +import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader} import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.spark.InitInputMetrics import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl @@ -82,11 +86,45 @@ class CarbonScanRDD( // get splits val splits = format.getSplits(job) - val result = distributeSplits(splits) - result + + // separate split + // 1. for batch splits, invoke distributeSplits method to create partitions + // 2. for stream splits, create partition for each split by default + val columnarSplits = new ArrayList[InputSplit]() + val streamSplits = new ArrayBuffer[InputSplit]() + splits.asScala.foreach { split => + val carbonInputSplit = split.asInstanceOf[CarbonInputSplit] + if (FileFormat.rowformat == carbonInputSplit.getFileFormat) { + streamSplits += split + } else { + columnarSplits.add(split) + } + } + val batchPartitions = distributeColumnarSplits(columnarSplits) + if (streamSplits.isEmpty) { + batchPartitions.toArray + } else { + val index = batchPartitions.length + val streamPartitions: mutable.Buffer[Partition] = + streamSplits.zipWithIndex.map { splitWithIndex => + val multiBlockSplit = + new CarbonMultiBlockSplit(identifier, + Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava, + splitWithIndex._1.getLocations, + FileFormat.rowformat) + new CarbonSparkPartition(id, splitWithIndex._2 + index, multiBlockSplit) + } + if (batchPartitions.isEmpty) { + streamPartitions.toArray + } else { + // should keep the order by index of partition + batchPartitions.appendAll(streamPartitions) + batchPartitions.toArray + } + } } - private def distributeSplits(splits: List[InputSplit]): Array[Partition] = { + private def distributeColumnarSplits(splits: List[InputSplit]): mutable.Buffer[Partition] = { // this function distributes the split based on following logic: // 1. based on data locality, to make split balanced on all available nodes // 2. if the number of split for one @@ -190,7 +228,7 @@ class CarbonScanRDD( | no.of.nodes: $noOfNodes, | parallelism: $parallelism """.stripMargin) - result.toArray(new Array[Partition](result.size())) + result.asScala } override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = { @@ -210,20 +248,34 @@ class CarbonScanRDD( inputMetricsStats.initBytesReadCallback(context, inputSplit) val iterator = if (inputSplit.getAllSplits.size() > 0) { val model = format.getQueryModel(inputSplit, attemptContext) - val reader = { - if (vectorReader) { - val carbonRecordReader = createVectorizedCarbonRecordReader(model, inputMetricsStats) - if (carbonRecordReader == null) { - new CarbonRecordReader(model, - format.getReadSupportClass(attemptContext.getConfiguration), inputMetricsStats) + // get RecordReader by FileFormat + val reader: RecordReader[Void, Object] = inputSplit.getFileFormat match { + case FileFormat.rowformat => + // create record reader for row format + DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl) + val inputFormat = new CarbonStreamInputFormat + val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext) + .asInstanceOf[CarbonStreamRecordReader] + streamReader.setVectorReader(vectorReader) + model.setStatisticsRecorder( + CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId)) + streamReader.setQueryModel(model) + streamReader + case _ => + // create record reader for CarbonData file format + if (vectorReader) { + val carbonRecordReader = createVectorizedCarbonRecordReader(model, inputMetricsStats) + if (carbonRecordReader == null) { + new CarbonRecordReader(model, + format.getReadSupportClass(attemptContext.getConfiguration), inputMetricsStats) + } else { + carbonRecordReader + } } else { - carbonRecordReader + new CarbonRecordReader(model, + format.getReadSupportClass(attemptContext.getConfiguration), + inputMetricsStats) } - } else { - new CarbonRecordReader(model, - format.getReadSupportClass(attemptContext.getConfiguration), - inputMetricsStats) - } } reader.initialize(inputSplit, attemptContext) http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/integration/spark2/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml index ed01728..18e37ad 100644 --- a/integration/spark2/pom.xml +++ b/integration/spark2/pom.xml @@ -36,7 +36,7 @@ <dependencies> <dependency> <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-spark-common</artifactId> + <artifactId>carbondata-streaming</artifactId> <version>${project.version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index d5adc2f..10336eb 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -23,9 +23,11 @@ import org.apache.spark.CarbonInputMetrics import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.command.management.LoadTableByInsertCommand +import org.apache.spark.sql.execution.streaming.Sink import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.optimizer.CarbonFilters -import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation} +import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation, StreamSinkProvider} +import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -36,6 +38,7 @@ import org.apache.carbondata.core.scan.expression.logical.AndExpression import org.apache.carbondata.core.util.{CarbonSessionInfo, ThreadLocalSessionInfo} import org.apache.carbondata.hadoop.CarbonProjection import org.apache.carbondata.spark.rdd.CarbonScanRDD +import org.apache.carbondata.streaming.StreamSinkFactory case class CarbonDatasourceHadoopRelation( sparkSession: SparkSession, http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index eeca8b8..6020eee 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.SparkSession.Builder +import org.apache.spark.sql.execution.streaming.CarbonStreamingQueryListener import org.apache.spark.sql.hive.CarbonSessionState import org.apache.spark.sql.internal.{SessionState, SharedState} import org.apache.spark.util.Utils @@ -168,6 +169,7 @@ object CarbonSession { SparkSession.sqlListener.set(null) } }) + session.streams.addListener(new CarbonStreamingQueryListener(session)) } return session