Repository: carbondata
Updated Branches:
  refs/heads/carbonstore 0aab4e7c6 -> 96fe233a2


http://git-wip-us.apache.org/repos/asf/carbondata/blob/96fe233a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
 
b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
new file mode 100644
index 0000000..c4b501d
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
@@ -0,0 +1,758 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import 
org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonHeaderReader;
+import 
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.intf.RowImpl;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.format.BlockletHeader;
+import org.apache.carbondata.format.FileHeader;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.InputMetricsStats;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
+import org.apache.spark.sql.types.CalendarIntervalType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Stream record reader
+ */
+public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
+  // vector reader
+  private boolean isVectorReader;
+
+  // metadata
+  private CarbonTable carbonTable;
+  private CarbonColumn[] storageColumns;
+  private boolean[] isRequired;
+  private DataType[] measureDataTypes;
+  private int dimensionCount;
+  private int measureCount;
+
+  // input
+  private FileSplit fileSplit;
+  private Configuration hadoopConf;
+  private StreamBlockletReader input;
+  private boolean isFirstRow = true;
+  private QueryModel model;
+
+  // decode data
+  private BitSet allNonNull;
+  private boolean[] isNoDictColumn;
+  private DirectDictionaryGenerator[] directDictionaryGenerators;
+  private CacheProvider cacheProvider;
+  private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache;
+  private GenericQueryType[] queryTypes;
+
+  // vectorized reader
+  private StructType outputSchema;
+  private ColumnarBatch columnarBatch;
+  private boolean isFinished = false;
+
+  // filter
+  private FilterExecuter filter;
+  private boolean[] isFilterRequired;
+  private Object[] filterValues;
+  private RowIntf filterRow;
+  private int[] filterMap;
+
+  // output
+  private CarbonColumn[] projection;
+  private boolean[] isProjectionRequired;
+  private int[] projectionMap;
+  private Object[] outputValues;
+  private InternalRow outputRow;
+
+  // empty project, null filter
+  private boolean skipScanData;
+
+  // return raw row for handoff
+  private boolean useRawRow = false;
+
+  // InputMetricsStats
+  private InputMetricsStats inputMetricsStats;
+
+  @Override public void initialize(InputSplit split, TaskAttemptContext 
context)
+      throws IOException, InterruptedException {
+    // input
+    if (split instanceof CarbonInputSplit) {
+      fileSplit = (CarbonInputSplit) split;
+    } else if (split instanceof CarbonMultiBlockSplit) {
+      fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0);
+    } else {
+      fileSplit = (FileSplit) split;
+    }
+
+    // metadata
+    hadoopConf = context.getConfiguration();
+    if (model == null) {
+      CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
+      model = format.createQueryModel(split, context);
+    }
+    carbonTable = model.getTable();
+    List<CarbonDimension> dimensions =
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
+    dimensionCount = dimensions.size();
+    List<CarbonMeasure> measures =
+        carbonTable.getMeasureByTableName(carbonTable.getTableName());
+    measureCount = measures.size();
+    List<CarbonColumn> carbonColumnList =
+        carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName());
+    storageColumns = carbonColumnList.toArray(new 
CarbonColumn[carbonColumnList.size()]);
+    isNoDictColumn = 
CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns);
+    directDictionaryGenerators = new 
DirectDictionaryGenerator[storageColumns.length];
+    for (int i = 0; i < storageColumns.length; i++) {
+      if (storageColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+        directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory
+            .getDirectDictionaryGenerator(storageColumns[i].getDataType());
+      }
+    }
+    measureDataTypes = new DataType[measureCount];
+    for (int i = 0; i < measureCount; i++) {
+      measureDataTypes[i] = storageColumns[dimensionCount + i].getDataType();
+    }
+
+    // decode data
+    allNonNull = new BitSet(storageColumns.length);
+    projection = model.getProjectionColumns();
+
+    isRequired = new boolean[storageColumns.length];
+    boolean[] isFiltlerDimensions = model.getIsFilterDimensions();
+    boolean[] isFiltlerMeasures = model.getIsFilterMeasures();
+    isFilterRequired = new boolean[storageColumns.length];
+    filterMap = new int[storageColumns.length];
+    for (int i = 0; i < storageColumns.length; i++) {
+      if (storageColumns[i].isDimension()) {
+        if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) {
+          isRequired[i] = true;
+          isFilterRequired[i] = true;
+          filterMap[i] = storageColumns[i].getOrdinal();
+        }
+      } else {
+        if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) {
+          isRequired[i] = true;
+          isFilterRequired[i] = true;
+          filterMap[i] = carbonTable.getDimensionOrdinalMax() + 
storageColumns[i].getOrdinal();
+        }
+      }
+    }
+
+    isProjectionRequired = new boolean[storageColumns.length];
+    projectionMap = new int[storageColumns.length];
+    for (int j = 0; j < projection.length; j++) {
+      for (int i = 0; i < storageColumns.length; i++) {
+        if (storageColumns[i].getColName().equals(projection[j].getColName())) 
{
+          isRequired[i] = true;
+          isProjectionRequired[i] = true;
+          projectionMap[i] = j;
+          break;
+        }
+      }
+    }
+
+    // initialize filter
+    if (null != model.getFilterExpressionResolverTree()) {
+      initializeFilter();
+    } else if (projection.length == 0) {
+      skipScanData = true;
+    }
+
+  }
+
+  private void initializeFilter() {
+
+    List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
+        
.getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
+            carbonTable.getMeasureByTableName(carbonTable.getTableName()));
+    int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
+    for (int i = 0; i < dimLensWithComplex.length; i++) {
+      dimLensWithComplex[i] = Integer.MAX_VALUE;
+    }
+
+    int[] dictionaryColumnCardinality =
+        CarbonUtil.getFormattedCardinality(dimLensWithComplex, 
wrapperColumnSchemaList);
+    SegmentProperties segmentProperties =
+        new SegmentProperties(wrapperColumnSchemaList, 
dictionaryColumnCardinality);
+    Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>();
+
+    FilterResolverIntf resolverIntf = model.getFilterExpressionResolverTree();
+    filter = FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties,
+        complexDimensionInfoMap);
+    // for row filter, we need update column index
+    
FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(),
+        carbonTable.getDimensionOrdinalMax());
+
+  }
+
+  public void setQueryModel(QueryModel model) {
+    this.model = model;
+  }
+
+  private byte[] getSyncMarker(String filePath) throws IOException {
+    CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath);
+    FileHeader header = headerReader.readHeader();
+    return header.getSync_marker();
+  }
+
+  public void setUseRawRow(boolean useRawRow) {
+    this.useRawRow = useRawRow;
+  }
+
+  private void initializeAtFirstRow() throws IOException {
+    filterValues = new Object[carbonTable.getDimensionOrdinalMax() + 
measureCount];
+    filterRow = new RowImpl();
+    filterRow.setValues(filterValues);
+
+    outputValues = new Object[projection.length];
+    outputRow = new GenericInternalRow(outputValues);
+
+    Path file = fileSplit.getPath();
+
+    byte[] syncMarker = getSyncMarker(file.toString());
+
+    FileSystem fs = file.getFileSystem(hadoopConf);
+
+    int bufferSize = 
Integer.parseInt(hadoopConf.get(CarbonStreamInputFormat.READ_BUFFER_SIZE,
+        CarbonStreamInputFormat.READ_BUFFER_SIZE_DEFAULT));
+
+    FSDataInputStream fileIn = fs.open(file, bufferSize);
+    fileIn.seek(fileSplit.getStart());
+    input = new StreamBlockletReader(syncMarker, fileIn, fileSplit.getLength(),
+        fileSplit.getStart() == 0);
+
+    cacheProvider = CacheProvider.getInstance();
+    cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY);
+    queryTypes = CarbonStreamInputFormat.getComplexDimensions(carbonTable, 
storageColumns, cache);
+
+    outputSchema = new StructType((StructField[])
+        
DataTypeUtil.getDataTypeConverter().convertCarbonSchemaToSparkSchema(projection));
+  }
+
+  @Override public boolean nextKeyValue() throws IOException, 
InterruptedException {
+    if (isFirstRow) {
+      isFirstRow = false;
+      initializeAtFirstRow();
+    }
+    if (isFinished) {
+      return false;
+    }
+
+    if (isVectorReader) {
+      return nextColumnarBatch();
+    }
+
+    return nextRow();
+  }
+
+  /**
+   * for vector reader, check next columnar batch
+   */
+  private boolean nextColumnarBatch() throws IOException {
+    boolean hasNext;
+    boolean scanMore = false;
+    do {
+      // move to the next blocklet
+      hasNext = input.nextBlocklet();
+      if (hasNext) {
+        // read blocklet header
+        BlockletHeader header = input.readBlockletHeader();
+        if (isScanRequired(header)) {
+          scanMore = !scanBlockletAndFillVector(header);
+        } else {
+          input.skipBlockletData(true);
+          scanMore = true;
+        }
+      } else {
+        isFinished = true;
+        scanMore = false;
+      }
+    } while (scanMore);
+    return hasNext;
+  }
+
+  /**
+   * check next Row
+   */
+  private boolean nextRow() throws IOException {
+    // read row one by one
+    try {
+      boolean hasNext;
+      boolean scanMore = false;
+      do {
+        hasNext = input.hasNext();
+        if (hasNext) {
+          if (skipScanData) {
+            input.nextRow();
+            scanMore = false;
+          } else {
+            if (useRawRow) {
+              // read raw row for streaming handoff which does not require 
decode raw row
+              readRawRowFromStream();
+            } else {
+              readRowFromStream();
+            }
+            if (null != filter) {
+              scanMore = !filter.applyFilter(filterRow, 
carbonTable.getDimensionOrdinalMax());
+            } else {
+              scanMore = false;
+            }
+          }
+        } else {
+          if (input.nextBlocklet()) {
+            BlockletHeader header = input.readBlockletHeader();
+            if (isScanRequired(header)) {
+              if (skipScanData) {
+                input.skipBlockletData(false);
+              } else {
+                input.readBlockletData(header);
+              }
+            } else {
+              input.skipBlockletData(true);
+            }
+            scanMore = true;
+          } else {
+            isFinished = true;
+            scanMore = false;
+          }
+        }
+      } while (scanMore);
+      return hasNext;
+    } catch (FilterUnsupportedException e) {
+      throw new IOException("Failed to filter row in detail reader", e);
+    }
+  }
+
+  @Override public Void getCurrentKey() throws IOException, 
InterruptedException {
+    return null;
+  }
+
+  @Override public Object getCurrentValue() throws IOException, 
InterruptedException {
+    if (isVectorReader) {
+      int value = columnarBatch.numValidRows();
+      if (inputMetricsStats != null) {
+        inputMetricsStats.incrementRecordRead((long) value);
+      }
+
+      return columnarBatch;
+    }
+
+    if (inputMetricsStats != null) {
+      inputMetricsStats.incrementRecordRead(1L);
+    }
+
+    return outputRow;
+  }
+
+  private boolean isScanRequired(BlockletHeader header) {
+    // TODO require to implement min-max index
+    return true;
+  }
+
+  private boolean scanBlockletAndFillVector(BlockletHeader header) throws 
IOException {
+    // if filter is null and output projection is empty, use the row number of 
blocklet header
+    if (skipScanData) {
+      int rowNums = header.getBlocklet_info().getNum_rows();
+      columnarBatch = ColumnarBatch.allocate(outputSchema, 
MemoryMode.OFF_HEAP, rowNums);
+      columnarBatch.setNumRows(rowNums);
+      input.skipBlockletData(true);
+      return rowNums > 0;
+    }
+
+    input.readBlockletData(header);
+    columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, 
input.getRowNums());
+    int rowNum = 0;
+    if (null == filter) {
+      while (input.hasNext()) {
+        readRowFromStream();
+        putRowToColumnBatch(rowNum++);
+      }
+    } else {
+      try {
+        while (input.hasNext()) {
+          readRowFromStream();
+          if (filter.applyFilter(filterRow, 
carbonTable.getDimensionOrdinalMax())) {
+            putRowToColumnBatch(rowNum++);
+          }
+        }
+      } catch (FilterUnsupportedException e) {
+        throw new IOException("Failed to filter row in vector reader", e);
+      }
+    }
+    columnarBatch.setNumRows(rowNum);
+    return rowNum > 0;
+  }
+
+  private void readRowFromStream() {
+    input.nextRow();
+    short nullLen = input.readShort();
+    BitSet nullBitSet = allNonNull;
+    if (nullLen > 0) {
+      nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
+    }
+    int colCount = 0;
+    // primitive type dimension
+    for (; colCount < isNoDictColumn.length; colCount++) {
+      if (nullBitSet.get(colCount)) {
+        if (isFilterRequired[colCount]) {
+          filterValues[filterMap[colCount]] = 
CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+        }
+        if (isProjectionRequired[colCount]) {
+          outputValues[projectionMap[colCount]] = null;
+        }
+      } else {
+        if (isNoDictColumn[colCount]) {
+          int v = input.readShort();
+          if (isRequired[colCount]) {
+            byte[] b = input.readBytes(v);
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = b;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] =
+                  DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(b,
+                      storageColumns[colCount].getDataType());
+            }
+          } else {
+            input.skipBytes(v);
+          }
+        } else if (null != directDictionaryGenerators[colCount]) {
+          if (isRequired[colCount]) {
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = input.copy(4);
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] =
+                  
directDictionaryGenerators[colCount].getValueFromSurrogate(input.readInt());
+            } else {
+              input.skipBytes(4);
+            }
+          } else {
+            input.skipBytes(4);
+          }
+        } else {
+          if (isRequired[colCount]) {
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = input.copy(4);
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = input.readInt();
+            } else {
+              input.skipBytes(4);
+            }
+          } else {
+            input.skipBytes(4);
+          }
+        }
+      }
+    }
+    // complex type dimension
+    for (; colCount < dimensionCount; colCount++) {
+      if (nullBitSet.get(colCount)) {
+        if (isFilterRequired[colCount]) {
+          filterValues[filterMap[colCount]] = null;
+        }
+        if (isProjectionRequired[colCount]) {
+          outputValues[projectionMap[colCount]] = null;
+        }
+      } else {
+        short v = input.readShort();
+        if (isRequired[colCount]) {
+          byte[] b = input.readBytes(v);
+          if (isFilterRequired[colCount]) {
+            filterValues[filterMap[colCount]] = b;
+          }
+          if (isProjectionRequired[colCount]) {
+            outputValues[projectionMap[colCount]] = queryTypes[colCount]
+                .getDataBasedOnDataType(ByteBuffer.wrap(b));
+          }
+        } else {
+          input.skipBytes(v);
+        }
+      }
+    }
+    // measure
+    DataType dataType;
+    for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
+      if (nullBitSet.get(colCount)) {
+        if (isFilterRequired[colCount]) {
+          filterValues[filterMap[colCount]] = null;
+        }
+        if (isProjectionRequired[colCount]) {
+          outputValues[projectionMap[colCount]] = null;
+        }
+      } else {
+        dataType = measureDataTypes[msrCount];
+        if (dataType == DataTypes.BOOLEAN) {
+          if (isRequired[colCount]) {
+            boolean v = input.readBoolean();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(1);
+          }
+        } else if (dataType == DataTypes.SHORT) {
+          if (isRequired[colCount]) {
+            short v = input.readShort();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(2);
+          }
+        } else if (dataType == DataTypes.INT) {
+          if (isRequired[colCount]) {
+            int v = input.readInt();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(4);
+          }
+        } else if (dataType == DataTypes.LONG) {
+          if (isRequired[colCount]) {
+            long v = input.readLong();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(8);
+          }
+        } else if (dataType == DataTypes.DOUBLE) {
+          if (isRequired[colCount]) {
+            double v = input.readDouble();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(8);
+          }
+        } else if (DataTypes.isDecimal(dataType)) {
+          int len = input.readShort();
+          if (isRequired[colCount]) {
+            BigDecimal v = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] =
+                  
DataTypeUtil.getDataTypeConverter().convertFromBigDecimalToDecimal(v);
+            }
+          } else {
+            input.skipBytes(len);
+          }
+        }
+      }
+    }
+  }
+
+  private void readRawRowFromStream() {
+    input.nextRow();
+    short nullLen = input.readShort();
+    BitSet nullBitSet = allNonNull;
+    if (nullLen > 0) {
+      nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
+    }
+    int colCount = 0;
+    // primitive type dimension
+    for (; colCount < isNoDictColumn.length; colCount++) {
+      if (nullBitSet.get(colCount)) {
+        outputValues[colCount] = 
CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+      } else {
+        if (isNoDictColumn[colCount]) {
+          int v = input.readShort();
+          outputValues[colCount] = input.readBytes(v);
+        } else {
+          outputValues[colCount] = input.readInt();
+        }
+      }
+    }
+    // complex type dimension
+    for (; colCount < dimensionCount; colCount++) {
+      if (nullBitSet.get(colCount)) {
+        outputValues[colCount] = null;
+      } else {
+        short v = input.readShort();
+        outputValues[colCount] = input.readBytes(v);
+      }
+    }
+    // measure
+    DataType dataType;
+    for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
+      if (nullBitSet.get(colCount)) {
+        outputValues[colCount] = null;
+      } else {
+        dataType = measureDataTypes[msrCount];
+        if (dataType == DataTypes.BOOLEAN) {
+          outputValues[colCount] = input.readBoolean();
+        } else if (dataType == DataTypes.SHORT) {
+          outputValues[colCount] = input.readShort();
+        } else if (dataType == DataTypes.INT) {
+          outputValues[colCount] = input.readInt();
+        } else if (dataType == DataTypes.LONG) {
+          outputValues[colCount] = input.readLong();
+        } else if (dataType == DataTypes.DOUBLE) {
+          outputValues[colCount] = input.readDouble();
+        } else if (DataTypes.isDecimal(dataType)) {
+          int len = input.readShort();
+          outputValues[colCount] = 
DataTypeUtil.byteToBigDecimal(input.readBytes(len));
+        }
+      }
+    }
+  }
+
+  private void putRowToColumnBatch(int rowId) {
+    for (int i = 0; i < projection.length; i++) {
+      Object value = outputValues[i];
+      ColumnVector col = columnarBatch.column(i);
+      org.apache.spark.sql.types.DataType t = col.dataType();
+      if (null == value) {
+        col.putNull(rowId);
+      } else {
+        if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
+          col.putBoolean(rowId, (boolean)value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
+          col.putByte(rowId, (byte) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
+          col.putShort(rowId, (short) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
+          col.putInt(rowId, (int) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
+          col.putLong(rowId, (long) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
+          col.putFloat(rowId, (float) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
+          col.putDouble(rowId, (double) value);
+        } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
+          UTF8String v = (UTF8String) value;
+          col.putByteArray(rowId, v.getBytes());
+        } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
+          DecimalType dt = (DecimalType)t;
+          Decimal d = Decimal.fromDecimal(value);
+          if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
+            col.putInt(rowId, (int)d.toUnscaledLong());
+          } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
+            col.putLong(rowId, d.toUnscaledLong());
+          } else {
+            final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
+            byte[] bytes = integer.toByteArray();
+            col.putByteArray(rowId, bytes, 0, bytes.length);
+          }
+        } else if (t instanceof CalendarIntervalType) {
+          CalendarInterval c = (CalendarInterval) value;
+          col.getChildColumn(0).putInt(rowId, c.months);
+          col.getChildColumn(1).putLong(rowId, c.microseconds);
+        } else if (t instanceof org.apache.spark.sql.types.DateType) {
+          col.putInt(rowId, (int) value);
+        } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
+          col.putLong(rowId, (long) value);
+        }
+      }
+    }
+  }
+
+  @Override public float getProgress() throws IOException, 
InterruptedException {
+    return 0;
+  }
+
+  public void setVectorReader(boolean isVectorReader) {
+    this.isVectorReader = isVectorReader;
+  }
+
+  public void setInputMetricsStats(InputMetricsStats inputMetricsStats) {
+    this.inputMetricsStats = inputMetricsStats;
+  }
+
+  @Override public void close() throws IOException {
+    if (null != input) {
+      input.close();
+    }
+    if (null != columnarBatch) {
+      columnarBatch.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/96fe233a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamUtils.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamUtils.java
 
b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamUtils.java
deleted file mode 100644
index a7940f9..0000000
--- 
a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamUtils.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.streaming;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-
-/**
- * Util class which does utility function for stream module
- */
-public class CarbonStreamUtils {
-
-  public static Constructor getConstructorWithReflection(String className,
-                                                           Class<?>... 
parameterTypes)
-            throws ClassNotFoundException, NoSuchMethodException {
-    Class loadedClass = Class.forName(className);
-    return loadedClass.getConstructor(parameterTypes);
-
-  }
-
-  public static Object getInstanceWithReflection(Constructor cons, Object... 
initargs) throws
-          IllegalAccessException,
-          InvocationTargetException, InstantiationException {
-    return cons.newInstance(initargs);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/96fe233a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
 
b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
index 5c7ad5e..43fe6ed 100644
--- 
a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
+++ 
b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
@@ -46,8 +46,7 @@ public class StreamBlockletReader {
   private int rowIndex = 0;
   private boolean isHeaderPresent;
 
-  public StreamBlockletReader(byte[] syncMarker, InputStream in, long limit,
-      boolean isHeaderPresent) {
+  StreamBlockletReader(byte[] syncMarker, InputStream in, long limit, boolean 
isHeaderPresent) {
     this.syncMarker = syncMarker;
     syncLen = syncMarker.length;
     syncBuffer = new byte[syncLen];
@@ -97,7 +96,7 @@ public class StreamBlockletReader {
     return false;
   }
 
-  public BlockletHeader readBlockletHeader() throws IOException {
+  BlockletHeader readBlockletHeader() throws IOException {
     int len = readIntFromStream();
     byte[] b = new byte[len];
     if (!readBytesFromStream(b, 0, len)) {
@@ -109,7 +108,7 @@ public class StreamBlockletReader {
     return header;
   }
 
-  public void readBlockletData(BlockletHeader header) throws IOException {
+  void readBlockletData(BlockletHeader header) throws IOException {
     ensureCapacity(header.getBlocklet_length());
     offset = 0;
     int len = readIntFromStream();
@@ -120,7 +119,7 @@ public class StreamBlockletReader {
     compressor.rawUncompress(b, buffer);
   }
 
-  public void skipBlockletData(boolean reset) throws IOException {
+  void skipBlockletData(boolean reset) throws IOException {
     int len = readIntFromStream();
     skip(len);
     pos += len;
@@ -141,7 +140,7 @@ public class StreamBlockletReader {
   /**
    * find the next blocklet
    */
-  public boolean nextBlocklet() throws IOException {
+  boolean nextBlocklet() throws IOException {
     if (pos >= limitStart) {
       return false;
     }
@@ -159,15 +158,15 @@ public class StreamBlockletReader {
     return pos < limitEnd;
   }
 
-  public boolean hasNext() throws IOException {
+  boolean hasNext() throws IOException {
     return rowIndex < rowNums;
   }
 
-  public void nextRow() {
+  void nextRow() {
     rowIndex++;
   }
 
-  public int readIntFromStream() throws IOException {
+  int readIntFromStream() throws IOException {
     int ch1 = in.read();
     int ch2 = in.read();
     int ch3 = in.read();
@@ -183,7 +182,7 @@ public class StreamBlockletReader {
    * @return <code>true</code> if reading data successfully, or
    * <code>false</code> if there is no more data because the end of the stream 
has been reached.
    */
-  public boolean readBytesFromStream(byte[] b, int offset, int len) throws 
IOException {
+  boolean readBytesFromStream(byte[] b, int offset, int len) throws 
IOException {
     int readLen = in.read(b, offset, len);
     if (readLen < 0) {
       return false;
@@ -196,24 +195,24 @@ public class StreamBlockletReader {
     }
   }
 
-  public boolean readBoolean() {
+  boolean readBoolean() {
     return (buffer[offset++]) != 0;
   }
 
-  public short readShort() {
+  short readShort() {
     short v =  (short) ((buffer[offset + 1] & 255) +
         ((buffer[offset]) << 8));
     offset += 2;
     return v;
   }
 
-  public byte[] copy(int len) {
+  byte[] copy(int len) {
     byte[] b = new byte[len];
     System.arraycopy(buffer, offset, b, 0, len);
     return b;
   }
 
-  public int readInt() {
+  int readInt() {
     int v = ((buffer[offset + 3] & 255) +
         ((buffer[offset + 2] & 255) << 8) +
         ((buffer[offset + 1] & 255) << 16) +
@@ -222,7 +221,7 @@ public class StreamBlockletReader {
     return v;
   }
 
-  public long readLong() {
+  long readLong() {
     long v = ((long)(buffer[offset + 7] & 255)) +
         ((long) (buffer[offset + 6] & 255) << 8) +
         ((long) (buffer[offset + 5] & 255) << 16) +
@@ -235,26 +234,26 @@ public class StreamBlockletReader {
     return v;
   }
 
-  public double readDouble() {
+  double readDouble() {
     return Double.longBitsToDouble(readLong());
   }
 
-  public byte[] readBytes(int len) {
+  byte[] readBytes(int len) {
     byte[] b = new byte[len];
     System.arraycopy(buffer, offset, b, 0, len);
     offset += len;
     return b;
   }
 
-  public void skipBytes(int len) {
+  void skipBytes(int len) {
     offset += len;
   }
 
-  public int getRowNums() {
+  int getRowNums() {
     return rowNums;
   }
 
-  public void close() {
+  void close() {
     CarbonUtil.closeStreams(in);
   }
 }

Reply via email to