Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/3001#discussion_r243176905
  
    --- Diff: 
integration/presto/src/main/java/org/apache/carbondata/presto/streaming/PrestoCarbonStreamRecordReader.java
 ---
    @@ -0,0 +1,617 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.presto.streaming;
    +
    +import java.io.IOException;
    +import java.math.BigDecimal;
    +import java.nio.ByteBuffer;
    +import java.util.BitSet;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.carbondata.core.cache.Cache;
    +import org.apache.carbondata.core.cache.CacheProvider;
    +import org.apache.carbondata.core.cache.CacheType;
    +import org.apache.carbondata.core.cache.dictionary.Dictionary;
    +import 
org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.datastore.compression.CompressorFactory;
    +import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
    +import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
    +import 
org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
    +import org.apache.carbondata.core.metadata.datatype.DataType;
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +import org.apache.carbondata.core.metadata.encoder.Encoding;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
    +import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
    +import 
org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
    +import org.apache.carbondata.core.reader.CarbonHeaderReader;
    +import org.apache.carbondata.core.scan.complextypes.ArrayQueryType;
    +import org.apache.carbondata.core.scan.complextypes.PrimitiveQueryType;
    +import org.apache.carbondata.core.scan.complextypes.StructQueryType;
    +import 
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
    +import org.apache.carbondata.core.scan.filter.FilterUtil;
    +import org.apache.carbondata.core.scan.filter.GenericQueryType;
    +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
    +import org.apache.carbondata.core.scan.filter.intf.RowImpl;
    +import org.apache.carbondata.core.scan.filter.intf.RowIntf;
    +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
    +import org.apache.carbondata.core.scan.model.QueryModel;
    +import org.apache.carbondata.core.util.CarbonMetadataUtil;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.DataTypeUtil;
    +import org.apache.carbondata.format.BlockletHeader;
    +import org.apache.carbondata.format.FileHeader;
    +import org.apache.carbondata.hadoop.CarbonInputSplit;
    +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
    +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
    +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.RecordReader;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    +
    +public class PrestoCarbonStreamRecordReader extends RecordReader<Void, 
Object> {
    +
    +  public static final String READ_BUFFER_SIZE = 
"carbon.stream.read.buffer.size";
    +  public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
    +
    +  // metadata
    +  private CarbonTable carbonTable;
    +  private CarbonColumn[] storageColumns;
    +  private boolean[] isRequired;
    +  private DataType[] measureDataTypes;
    +  private int dimensionCount;
    +  private int measureCount;
    +
    +  // input
    +  private FileSplit fileSplit;
    +  private Configuration hadoopConf;
    +  private PrestoStreamBlockletReader input;
    +  private boolean isFirstRow = true;
    +  private QueryModel model;
    +
    +  // decode data
    +  private BitSet allNonNull;
    +  private boolean[] isNoDictColumn;
    +  private DirectDictionaryGenerator[] directDictionaryGenerators;
    +  private CacheProvider cacheProvider;
    +  private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache;
    +  private GenericQueryType[] queryTypes;
    +  private String compressorName;
    +
    +  // vectorized reader
    +  private boolean isFinished = false;
    +
    +  // filter
    +  private FilterExecuter filter;
    +  private boolean[] isFilterRequired;
    +  private Object[] filterValues;
    +  private RowIntf filterRow;
    +  private int[] filterMap;
    +
    +  // output
    +  private CarbonColumn[] projection;
    +  private boolean[] isProjectionRequired;
    +  private int[] projectionMap;
    +  private Object[] outputValues;
    +
    +  // empty project, null filter
    +  private boolean skipScanData;
    +
    +  public PrestoCarbonStreamRecordReader(QueryModel mdl) {
    +    this.model = mdl;
    +
    +  }
    +  @Override public void initialize(InputSplit split, TaskAttemptContext 
context)
    +      throws IOException {
    +    // input
    +    if (split instanceof CarbonInputSplit) {
    +      fileSplit = (CarbonInputSplit) split;
    +    } else if (split instanceof CarbonMultiBlockSplit) {
    +      fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0);
    +    } else {
    +      fileSplit = (FileSplit) split;
    +    }
    +
    +    // metadata
    +    hadoopConf = context.getConfiguration();
    +    if (model == null) {
    +      CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
    +      model = format.createQueryModel(split, context);
    +    }
    +    carbonTable = model.getTable();
    +    List<CarbonDimension> dimensions =
    +        carbonTable.getDimensionByTableName(carbonTable.getTableName());
    +    dimensionCount = dimensions.size();
    +    List<CarbonMeasure> measures =
    +        carbonTable.getMeasureByTableName(carbonTable.getTableName());
    +    measureCount = measures.size();
    +    List<CarbonColumn> carbonColumnList =
    +        
carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName());
    +    storageColumns = carbonColumnList.toArray(new 
CarbonColumn[carbonColumnList.size()]);
    +    isNoDictColumn = 
CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns);
    +    directDictionaryGenerators = new 
DirectDictionaryGenerator[storageColumns.length];
    +    for (int i = 0; i < storageColumns.length; i++) {
    +      if (storageColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
    +        directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory
    +            .getDirectDictionaryGenerator(storageColumns[i].getDataType());
    +      }
    +    }
    +    measureDataTypes = new DataType[measureCount];
    +    for (int i = 0; i < measureCount; i++) {
    +      measureDataTypes[i] = storageColumns[dimensionCount + 
i].getDataType();
    +    }
    +
    +    // decode data
    +    allNonNull = new BitSet(storageColumns.length);
    +    projection = model.getProjectionColumns();
    +
    +    isRequired = new boolean[storageColumns.length];
    +    boolean[] isFiltlerDimensions = model.getIsFilterDimensions();
    +    boolean[] isFiltlerMeasures = model.getIsFilterMeasures();
    +    isFilterRequired = new boolean[storageColumns.length];
    +    filterMap = new int[storageColumns.length];
    +    for (int i = 0; i < storageColumns.length; i++) {
    +      if (storageColumns[i].isDimension()) {
    +        if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) {
    +          isRequired[i] = true;
    +          isFilterRequired[i] = true;
    +          filterMap[i] = storageColumns[i].getOrdinal();
    +        }
    +      } else {
    +        if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) {
    +          isRequired[i] = true;
    +          isFilterRequired[i] = true;
    +          filterMap[i] = carbonTable.getDimensionOrdinalMax() + 
storageColumns[i].getOrdinal();
    +        }
    +      }
    +    }
    +
    +    isProjectionRequired = new boolean[storageColumns.length];
    +    projectionMap = new int[storageColumns.length];
    +    for (int j = 0; j < projection.length; j++) {
    +      for (int i = 0; i < storageColumns.length; i++) {
    +        if 
(storageColumns[i].getColName().equals(projection[j].getColName())) {
    +          isRequired[i] = true;
    +          isProjectionRequired[i] = true;
    +          projectionMap[i] = j;
    +          break;
    +        }
    +      }
    +    }
    +
    +    // initialize filter
    +    if (null != model.getFilterExpressionResolverTree()) {
    +      initializeFilter();
    +    } else if (projection.length == 0) {
    +      skipScanData = true;
    +    }
    +
    +  }
    +
    +  private void initializeFilter() {
    +    List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
    +        
.getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
    +            carbonTable.getMeasureByTableName(carbonTable.getTableName()));
    +    int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
    +    for (int i = 0; i < dimLensWithComplex.length; i++) {
    +      dimLensWithComplex[i] = Integer.MAX_VALUE;
    +    }
    +
    +    int[] dictionaryColumnCardinality =
    +        CarbonUtil.getFormattedCardinality(dimLensWithComplex, 
wrapperColumnSchemaList);
    +    SegmentProperties segmentProperties =
    +        new SegmentProperties(wrapperColumnSchemaList, 
dictionaryColumnCardinality);
    +    Map<Integer, GenericQueryType> complexDimensionInfoMap = new 
HashMap<>();
    +
    +    FilterResolverIntf resolverIntf = 
model.getFilterExpressionResolverTree();
    +    filter = FilterUtil.getFilterExecuterTree(resolverIntf, 
segmentProperties,
    +        complexDimensionInfoMap);
    +    // for row filter, we need update column index
    +    
FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(),
    +        carbonTable.getDimensionOrdinalMax());
    +
    +  }
    +
    +  private byte[] getSyncMarker(String filePath) throws IOException {
    +    CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath);
    +    FileHeader header = headerReader.readHeader();
    +    // legacy store does not have this member
    +    if (header.isSetCompressor_name()) {
    +      compressorName = header.getCompressor_name();
    +    } else {
    +      compressorName = 
CompressorFactory.NativeSupportedCompressor.SNAPPY.getName();
    +    }
    +    return header.getSync_marker();
    +  }
    +
    +  private void initializeAtFirstRow() throws IOException {
    +    filterValues = new Object[carbonTable.getDimensionOrdinalMax() + 
measureCount];
    +    filterRow = new RowImpl();
    +    filterRow.setValues(filterValues);
    +
    +    outputValues = new Object[projection.length];
    +
    +    Path file = fileSplit.getPath();
    +
    +    byte[] syncMarker = getSyncMarker(file.toString());
    +
    +    FileSystem fs = file.getFileSystem(hadoopConf);
    +
    +    int bufferSize = Integer.parseInt(hadoopConf.get(READ_BUFFER_SIZE, 
READ_BUFFER_SIZE_DEFAULT));
    +
    +    FSDataInputStream fileIn = fs.open(file, bufferSize);
    +    fileIn.seek(fileSplit.getStart());
    +    input = new PrestoStreamBlockletReader(syncMarker, fileIn, 
fileSplit.getLength(),
    +        fileSplit.getStart() == 0, compressorName);
    +
    +    cacheProvider = CacheProvider.getInstance();
    +    cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY);
    +    queryTypes = getComplexDimensions(carbonTable, storageColumns, cache);
    +  }
    +
    +  @Override public boolean nextKeyValue() throws IOException, 
InterruptedException {
    +    if (isFirstRow) {
    +      isFirstRow = false;
    +      initializeAtFirstRow();
    +    }
    +    if (isFinished) {
    +      return false;
    +    }
    +
    +    return nextRow();
    --- End diff --
    
    The stream file is row format, it doesn't have vector flow.
    I already remove this file and just reuse old record reader.


---

Reply via email to