Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6959#discussion_r33516110
  
    --- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java
 ---
    @@ -70,67 +92,38 @@
       private final boolean enablePerfMetrics;
     
       /**
    -   * @return true if UnsafeFixedWidthAggregationMap supports grouping keys 
with the given schema,
    -   *         false otherwise.
    -   */
    -  public static boolean supportsGroupKeySchema(StructType schema) {
    -    for (StructField field: schema.fields()) {
    -      if (!UnsafeRow.readableFieldTypes.contains(field.dataType())) {
    -        return false;
    -      }
    -    }
    -    return true;
    -  }
    -
    -  /**
    -   * @return true if UnsafeFixedWidthAggregationMap supports aggregation 
buffers with the given
    -   *         schema, false otherwise.
    -   */
    -  public static boolean supportsAggregationBufferSchema(StructType schema) 
{
    -    for (StructField field: schema.fields()) {
    -      if (!UnsafeRow.settableFieldTypes.contains(field.dataType())) {
    -        return false;
    -      }
    -    }
    -    return true;
    -  }
    -
    -  /**
        * Create a new UnsafeFixedWidthAggregationMap.
        *
    -   * @param emptyAggregationBuffer the default value for new keys (a 
"zero" of the agg. function)
    -   * @param aggregationBufferSchema the schema of the aggregation buffer, 
used for row conversion.
    -   * @param groupingKeySchema the schema of the grouping key, used for row 
conversion.
    +   * @param initProjection the default value for new keys (a "zero" of the 
agg. function)
    +   * @param keyConverter the converter of the grouping key, used for row 
conversion.
    +   * @param bufferConverter the converter of the aggregation buffer, used 
for row conversion.
        * @param memoryManager the memory manager used to allocate our Unsafe 
memory structures.
        * @param initialCapacity the initial capacity of the map (a sizing hint 
to avoid re-hashing).
        * @param enablePerfMetrics if true, performance metrics will be 
recorded (has minor perf impact)
        */
       public UnsafeFixedWidthAggregationMap(
    -      InternalRow emptyAggregationBuffer,
    -      StructType aggregationBufferSchema,
    -      StructType groupingKeySchema,
    +      Function1<InternalRow, InternalRow> initProjection,
    +      UnsafeRowConverter keyConverter,
    +      UnsafeRowConverter bufferConverter,
           TaskMemoryManager memoryManager,
           int initialCapacity,
           boolean enablePerfMetrics) {
    -    this.emptyAggregationBuffer =
    -      convertToUnsafeRow(emptyAggregationBuffer, aggregationBufferSchema);
    -    this.aggregationBufferSchema = aggregationBufferSchema;
    -    this.groupingKeyToUnsafeRowConverter = new 
UnsafeRowConverter(groupingKeySchema);
    -    this.groupingKeySchema = groupingKeySchema;
    -    this.map = new BytesToBytesMap(memoryManager, initialCapacity, 
enablePerfMetrics);
    +    this.initProjection = initProjection;
    +    this.keyConverter = keyConverter;
    +    this.bufferConverter = bufferConverter;
         this.enablePerfMetrics = enablePerfMetrics;
    -  }
     
    -  /**
    -   * Convert a Java object row into an UnsafeRow, allocating it into a new 
byte array.
    -   */
    -  private static byte[] convertToUnsafeRow(InternalRow javaRow, StructType 
schema) {
    -    final UnsafeRowConverter converter = new UnsafeRowConverter(schema);
    -    final byte[] unsafeRow = new 
byte[converter.getSizeRequirement(javaRow)];
    -    final int writtenLength =
    -      converter.writeRow(javaRow, unsafeRow, 
PlatformDependent.BYTE_ARRAY_OFFSET);
    -    assert (writtenLength == unsafeRow.length): "Size requirement 
calculation was wrong!";
    -    return unsafeRow;
    +    this.map = new BytesToBytesMap(memoryManager, initialCapacity, 
enablePerfMetrics);
    +    this.keyPool = new UniqueObjectPool(100);
    +    this.bufferPool = new ObjectPool(initialCapacity);
    +
    +    InternalRow initRow = initProjection.apply(emptyRow);
    +    this.emptyBuffer = new 
byte[bufferConverter.getSizeRequirement(initRow)];
    +    int writtenLength = bufferConverter.writeRow(
    +      initRow, emptyBuffer, PlatformDependent.BYTE_ARRAY_OFFSET, 
bufferPool);
    +    assert (writtenLength == emptyBuffer.length): "Size requirement 
calculation was wrong!";
    +    // re-use the empty buffer only when there is no object saved in pool.
    +    reuseEmptyBuffer = bufferPool.size() == 0;
    --- End diff --
    
    Looked more closely and this is fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to