Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10628#discussion_r49400006
  
    --- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
    @@ -0,0 +1,291 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.vectorized;
    +
    +import org.apache.spark.sql.catalyst.InternalRow;
    +import org.apache.spark.sql.catalyst.util.ArrayData;
    +import org.apache.spark.sql.catalyst.util.MapData;
    +import org.apache.spark.sql.types.DataType;
    +import org.apache.spark.sql.types.Decimal;
    +import org.apache.spark.sql.types.StructField;
    +import org.apache.spark.sql.types.StructType;
    +import org.apache.spark.unsafe.types.CalendarInterval;
    +import org.apache.spark.unsafe.types.UTF8String;
    +
    +import java.util.Iterator;
    +
    +import org.apache.commons.lang.NotImplementedException;
    +
    +/**
    + * This class is the in memory representation of rows as they are streamed 
through operators. It
    + * is designed to maximize CPU efficiency and not storage footprint. Since 
it is expected that
    + * each operator allocates one of thee objects, the storage footprint on 
the task is negligible.
    + *
    + * The layout is a columnar with values encoded in their native format. 
Each RowBatch contains
    + * a horizontal partitioning of the data, split into columns.
    + *
    + * The ColumnarBatch supports either on heap or offheap modes with 
(mostly) the identical API.
    + *
    + * TODO:
    + *  - There are many TODOs for the existing APIs. They should throw a not 
implemented exception.
    + *  - Compaction: The batch and columns should be able to compact based on 
a selection vector.
    + */
    +public final class ColumnarBatch {
    +  private static final int DEFAULT_BATCH_SIZE = 4 * 1024;
    +
    +  private final StructType schema;
    +  private final int capacity;
    +  private int numRows;
    +  private final ColumnVector[] columns;
    +
    +  // True if the row is filtered.
    +  private final boolean[] filteredRows;
    +
    +  // Total number of rows that have been filtered.
    +  private int numRowsFiltered = 0;
    +
    +  public static ColumnarBatch allocate(StructType schema, boolean offHeap) 
{
    +    return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, offHeap);
    +  }
    +
    +  public static ColumnarBatch allocate(StructType schema, boolean offHeap, 
int maxRows) {
    +    return new ColumnarBatch(schema, maxRows, offHeap);
    +  }
    +
    +  /**
    +   * Called to close all the columns in this batch. It is not valid to 
access the data after
    +   * calling this. This must be called at the end to clean up memory 
allcoations.
    +   */
    +  public void close() {
    +    for (ColumnVector c: columns) {
    +      c.close();
    +    }
    +  }
    +
    +  /**
    +   * Adapter class to interop with existing components that expect 
internal row. A lot of
    +   * performance is lost with this translation.
    +   */
    +  public final class Row extends InternalRow {
    +    private int rowId;
    +
    +    /**
    +     * Marks this row as being filtered out. This means a subsequent 
iteration over the rows
    +     * in this batch will not include this row.
    +     */
    +    public final void markFiltered() {
    +      ColumnarBatch.this.markFiltered(rowId);
    +    }
    +
    +    @Override
    +    public final int numFields() {
    +      return ColumnarBatch.this.numCols();
    +    }
    +
    +    @Override
    +    public final InternalRow copy() {
    +      throw new NotImplementedException();
    +    }
    +
    +    @Override
    +    public final boolean anyNull() {
    +      return false;
    +    }
    +
    +    @Override
    +    public final boolean isNullAt(int ordinal) {
    +      return ColumnarBatch.this.column(ordinal).getIsNull(rowId);
    +    }
    +
    +    @Override
    +    public final boolean getBoolean(int ordinal) {
    +      throw new NotImplementedException();
    +    }
    +
    +    @Override
    +    public final byte getByte(int ordinal) {
    +      throw new NotImplementedException();
    +    }
    +
    +    @Override
    +    public final short getShort(int ordinal) {
    +      throw new NotImplementedException();
    +    }
    +
    +    @Override
    +    public final int getInt(int ordinal) {
    +      return ColumnarBatch.this.column(ordinal).getInt(rowId);
    +    }
    +
    +    @Override
    +    public final long getLong(int ordinal) {
    +      throw new NotImplementedException();
    +    }
    +
    +    @Override
    +    public final float getFloat(int ordinal) {
    +      throw new NotImplementedException();
    +    }
    +
    +    @Override
    +    public final double getDouble(int ordinal) {
    +      return ColumnarBatch.this.column(ordinal).getDouble(rowId);
    +    }
    +
    +    @Override
    +    public final Decimal getDecimal(int ordinal, int precision, int scale) 
{
    +      throw new NotImplementedException();
    +    }
    +
    +    @Override
    +    public final UTF8String getUTF8String(int ordinal) {
    +      throw new NotImplementedException();
    +    }
    +
    +    @Override
    +    public final byte[] getBinary(int ordinal) {
    +      throw new NotImplementedException();
    +    }
    +
    +    @Override
    +    public final CalendarInterval getInterval(int ordinal) {
    +      throw new NotImplementedException();
    +    }
    +
    +    @Override
    +    public final InternalRow getStruct(int ordinal, int numFields) {
    +      throw new NotImplementedException();
    +    }
    +
    +    @Override
    +    public final ArrayData getArray(int ordinal) {
    +      throw new NotImplementedException();
    +    }
    +
    +    @Override
    +    public final MapData getMap(int ordinal) {
    +      throw new NotImplementedException();
    +    }
    +
    +    @Override
    +    public final Object get(int ordinal, DataType dataType) {
    +      throw new NotImplementedException();
    +    }
    +  }
    +
    +  /**
    +   * Returns an iterator over the rows in this batch. This skips rows that 
are filtered out.
    +   */
    +  public Iterator<Row> rowIterator() {
    +    final int maxRows = ColumnarBatch.this.numRows();
    +    final Row row = new Row();
    +    return new Iterator<Row>() {
    +      int rowId = 0;
    +
    +      @Override
    +      public boolean hasNext() {
    +        while (rowId < maxRows && ColumnarBatch.this.filteredRows[rowId]) {
    +          ++rowId;
    +        }
    +        return rowId < maxRows;
    +      }
    +
    +      @Override
    +      public Row next() {
    +        while (rowId < maxRows && ColumnarBatch.this.filteredRows[rowId]) {
    +          ++rowId;
    +        }
    +        row.rowId = rowId++;
    +        return row;
    +      }
    +    };
    +  }
    +
    +  /**
    +   * Resets the batch for writing.
    +   */
    +  public void reset() {
    +    for (int i = 0; i < numCols(); ++i) {
    +      columns[i].reset();
    +    }
    +    if (this.numRowsFiltered > 0) {
    +      for (int i = 0; i < numRows; ++i) {
    +        filteredRows[i] = false;
    --- End diff --
    
    Yea - there is also a variant of it that can take an object with an offset.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to