Github user linwen commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1350#discussion_r178483636 --- Diff: contrib/vexecutor/parquet_reader.c --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "parquet_reader.h" + +#include "executor/executor.h" +#include "tuplebatch.h" +#include "vcheck.h" + +extern bool getNextRowGroup(ParquetScanDesc scan); +static int +ParquetRowGroupReader_ScanNextTupleBatch( + TupleDesc tupDesc, + ParquetRowGroupReader *rowGroupReader, + int *hawqAttrToParquetColNum, + bool *projs, + TupleTableSlot *slot); + +static void +parquet_vgetnext(ParquetScanDesc scan, ScanDirection direction, TupleTableSlot *slot); + +TupleTableSlot * +ParquetVScanNext(ScanState *scanState) +{ + Assert(IsA(scanState, TableScanState) || IsA(scanState, DynamicTableScanState)); + ParquetScanState *node = (ParquetScanState *)scanState; + Assert(node->opaque != NULL && node->opaque->scandesc != NULL); + + parquet_vgetnext(node->opaque->scandesc, node->ss.ps.state->es_direction, node->ss.ss_ScanTupleSlot); + return node->ss.ss_ScanTupleSlot; +} + +static void +parquet_vgetnext(ParquetScanDesc scan, ScanDirection direction, TupleTableSlot *slot) +{ + + //AOTupleId aoTupleId; + Assert(ScanDirectionIsForward(direction)); + + for(;;) + { + if(scan->bufferDone) + { + /* + * Get the next row group. We call this function until we + * successfully get a block to process, or finished reading + * all the data (all 'segment' files) for this relation. + */ + while(!getNextRowGroup(scan)) + { + /* have we read all this relation's data. done! */ + if(scan->pqs_done_all_splits) + { + ExecClearTuple(slot); + return /*NULL*/; + } + } + scan->bufferDone = false; + } + + int row_num = ParquetRowGroupReader_ScanNextTupleBatch( + scan->pqs_tupDesc, + &scan->rowGroupReader, + scan->hawqAttrToParquetColChunks, + scan->proj, + slot); + if(row_num > 0) + return; + + /* no more items in the row group, get new buffer */ + scan->bufferDone = true; + } +} + +/* + * Get next tuple batch from current row group into slot. + * + * Return false if current row group has no tuple left, true otherwise. + */ +static int +ParquetRowGroupReader_ScanNextTupleBatch( + TupleDesc tupDesc, + ParquetRowGroupReader *rowGroupReader, + int *hawqAttrToParquetColNum, + bool *projs, + TupleTableSlot *slot) +{ + Assert(slot); + + if (rowGroupReader->rowRead >= rowGroupReader->rowCount) + { + ParquetRowGroupReader_FinishedScanRowGroup(rowGroupReader); + return false; + } + + /* + * get the next item (tuple) from the row group + */ + int ncol = slot->tts_tupleDescriptor->natts; + TupleBatch tb = (TupleBatch )slot->PRIVATE_tb; + + tb->nrows = 0; + if (rowGroupReader->rowRead + tb->batchsize > rowGroupReader->rowCount) { + tb->nrows = rowGroupReader->rowCount-rowGroupReader->rowRead; + rowGroupReader->rowRead = rowGroupReader->rowCount; + } + else { + tb->nrows = tb->batchsize ; + rowGroupReader->rowRead += tb->batchsize; + } + + int colReaderIndex = 0; + for(int i = 0; i < tb->ncols ; i++) + { + if(projs[i] == false) + continue; + + Oid hawqTypeID = tupDesc->attrs[i]->atttypid; + Oid hawqVTypeID = GetVtype(hawqTypeID); --- End diff -- Please fix indent here.
---