Github user linwen commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1350#discussion_r178483636
  
    --- Diff: contrib/vexecutor/parquet_reader.c ---
    @@ -0,0 +1,194 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +#include "parquet_reader.h"
    +
    +#include "executor/executor.h"
    +#include "tuplebatch.h"
    +#include "vcheck.h"
    +
    +extern bool getNextRowGroup(ParquetScanDesc scan);
    +static int
    +ParquetRowGroupReader_ScanNextTupleBatch(
    +           TupleDesc                               tupDesc,
    +           ParquetRowGroupReader   *rowGroupReader,
    +           int                                             
*hawqAttrToParquetColNum,
    +           bool                                    *projs,
    +           TupleTableSlot                  *slot);
    +
    +static void
    +parquet_vgetnext(ParquetScanDesc scan, ScanDirection direction, 
TupleTableSlot *slot);
    +
    +TupleTableSlot *
    +ParquetVScanNext(ScanState *scanState)
    +{
    +   Assert(IsA(scanState, TableScanState) || IsA(scanState, 
DynamicTableScanState));
    +   ParquetScanState *node = (ParquetScanState *)scanState;
    +   Assert(node->opaque != NULL && node->opaque->scandesc != NULL);
    +
    +   parquet_vgetnext(node->opaque->scandesc, 
node->ss.ps.state->es_direction, node->ss.ss_ScanTupleSlot);
    +   return node->ss.ss_ScanTupleSlot;
    +}
    +
    +static void
    +parquet_vgetnext(ParquetScanDesc scan, ScanDirection direction, 
TupleTableSlot *slot)
    +{
    +
    +   //AOTupleId aoTupleId;
    +   Assert(ScanDirectionIsForward(direction));
    +
    +   for(;;)
    +   {
    +           if(scan->bufferDone)
    +           {
    +                   /*
    +                    * Get the next row group. We call this function until 
we
    +                    * successfully get a block to process, or finished 
reading
    +                    * all the data (all 'segment' files) for this relation.
    +                    */
    +                   while(!getNextRowGroup(scan))
    +                   {
    +                           /* have we read all this relation's data. done! 
*/
    +                           if(scan->pqs_done_all_splits)
    +                           {
    +                                   ExecClearTuple(slot);
    +                                   return /*NULL*/;
    +                           }
    +                   }
    +                   scan->bufferDone = false;
    +           }
    +
    +           int row_num  = ParquetRowGroupReader_ScanNextTupleBatch(
    +                                                           
scan->pqs_tupDesc,
    +                                                           
&scan->rowGroupReader,
    +                                                           
scan->hawqAttrToParquetColChunks,
    +                                                           scan->proj,
    +                                                           slot);
    +           if(row_num > 0)
    +                   return;
    +
    +           /* no more items in the row group, get new buffer */
    +           scan->bufferDone = true;
    +   }
    +}
    +
    +/*
    + * Get next tuple batch from current row group into slot.
    + *
    + * Return false if current row group has no tuple left, true otherwise.
    + */
    +static int
    +ParquetRowGroupReader_ScanNextTupleBatch(
    +   TupleDesc                               tupDesc,
    +   ParquetRowGroupReader   *rowGroupReader,
    +   int                                             
*hawqAttrToParquetColNum,
    +   bool                                    *projs,
    +   TupleTableSlot                  *slot)
    +{
    +   Assert(slot);
    +
    +   if (rowGroupReader->rowRead >= rowGroupReader->rowCount)
    +   {
    +           ParquetRowGroupReader_FinishedScanRowGroup(rowGroupReader);
    +           return false;
    +   }
    +
    +   /*
    +    * get the next item (tuple) from the row group
    +    */
    +   int ncol = slot->tts_tupleDescriptor->natts;
    +    TupleBatch tb = (TupleBatch )slot->PRIVATE_tb;
    +
    +   tb->nrows = 0;
    +   if (rowGroupReader->rowRead + tb->batchsize > rowGroupReader->rowCount) 
{
    +           tb->nrows = rowGroupReader->rowCount-rowGroupReader->rowRead;
    +           rowGroupReader->rowRead = rowGroupReader->rowCount;
    +   }
    +   else {
    +           tb->nrows = tb->batchsize ;
    +           rowGroupReader->rowRead += tb->batchsize;
    +   }
    +
    +   int colReaderIndex = 0;
    +   for(int i = 0; i < tb->ncols ; i++)
    +   {
    +           if(projs[i] == false)
    +                   continue;
    +
    +           Oid hawqTypeID = tupDesc->attrs[i]->atttypid;
    +        Oid hawqVTypeID = GetVtype(hawqTypeID);
    --- End diff --
    
    Please fix indent here. 


---

Reply via email to