Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13680#discussion_r79654687
  
    --- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
 ---
    @@ -33,134 +37,213 @@
       // The offset of the global buffer where we start to write this array.
       private int startingOffset;
     
    -  public void initialize(BufferHolder holder, int numElements, int 
fixedElementSize) {
    -    // We need 4 bytes to store numElements and 4 bytes each element to 
store offset.
    -    final int fixedSize = 4 + 4 * numElements;
    +  // The number of elements in this array
    +  private int numElements;
    +
    +  private int headerInBytes;
    +
    +  private void assertIndexIsValid(int index) {
    +    assert index >= 0 : "index (" + index + ") should >= 0";
    +    assert index < numElements : "index (" + index + ") should < " + 
numElements;
    +  }
    +
    +  public void initialize(BufferHolder holder, int numElements, int 
elementSize) {
    +    // We need 8 bytes to store numElements in header
    +    this.numElements = numElements;
    +    this.headerInBytes = calculateHeaderPortionInBytes(numElements);
     
         this.holder = holder;
         this.startingOffset = holder.cursor;
     
    -    holder.grow(fixedSize);
    -    Platform.putInt(holder.buffer, holder.cursor, numElements);
    -    holder.cursor += fixedSize;
    +    // Grows the global buffer ahead for header and fixed size data.
    +    int fixedPartInBytes =
    +      ByteArrayMethods.roundNumberOfBytesToNearestWord(elementSize * 
numElements);
    +    holder.grow(headerInBytes + fixedPartInBytes);
    +
    +    // Write numElements and clear out null bits to header
    +    Platform.putLong(holder.buffer, startingOffset, numElements);
    +    for (int i = 8; i < headerInBytes; i += 8) {
    +      Platform.putLong(holder.buffer, startingOffset + i, 0L);
    +    }
    +
    +    // fill 0 into reminder part of 8-bytes alignment in unsafe array
    +    for (int i = elementSize * numElements; i < fixedPartInBytes; i++) {
    +      Platform.putByte(holder.buffer, startingOffset + headerInBytes + i, 
(byte) 0);
    +    }
    +    holder.cursor += (headerInBytes + fixedPartInBytes);
    +  }
    +
    +  private void zeroOutPaddingBytes(int numBytes) {
    +    if ((numBytes & 0x07) > 0) {
    +      Platform.putLong(holder.buffer, holder.cursor + ((numBytes >> 3) << 
3), 0L);
    +    }
    +  }
    +
    +  private long getElementOffset(int ordinal, int elementSize) {
    +    return startingOffset + headerInBytes + ordinal * elementSize;
    +  }
    +
    +  public void setOffsetAndSize(int ordinal, long currentCursor, long size) 
{
    +    assertIndexIsValid(ordinal);
    +    final long relativeOffset = currentCursor - startingOffset;
    +    final long offsetAndSize = (relativeOffset << 32) | size;
     
    -    // Grows the global buffer ahead for fixed size data.
    -    holder.grow(fixedElementSize * numElements);
    +    write(ordinal, offsetAndSize);
       }
     
    -  private long getElementOffset(int ordinal) {
    -    return startingOffset + 4 + 4 * ordinal;
    +  private void setNullBit(int ordinal) {
    +    assertIndexIsValid(ordinal);
    +    BitSetMethods.set(holder.buffer, startingOffset + 8, ordinal);
       }
     
    -  public void setNullAt(int ordinal) {
    -    final int relativeOffset = holder.cursor - startingOffset;
    -    // Writes negative offset value to represent null element.
    -    Platform.putInt(holder.buffer, getElementOffset(ordinal), 
-relativeOffset);
    +  public void setNullBoolean(int ordinal) {
    +    setNullBit(ordinal);
    +    // put zero into the corresponding field when set null
    +    Platform.putBoolean(holder.buffer, getElementOffset(ordinal, 1), 
false);
       }
     
    -  public void setOffset(int ordinal) {
    -    final int relativeOffset = holder.cursor - startingOffset;
    -    Platform.putInt(holder.buffer, getElementOffset(ordinal), 
relativeOffset);
    +  public void setNullByte(int ordinal) {
    +    setNullBit(ordinal);
    +    // put zero into the corresponding field when set null
    +    Platform.putByte(holder.buffer, getElementOffset(ordinal, 1), (byte)0);
       }
     
    +  public void setNullShort(int ordinal) {
    +    setNullBit(ordinal);
    +    // put zero into the corresponding field when set null
    +    Platform.putShort(holder.buffer, getElementOffset(ordinal, 2), 
(short)0);
    +  }
    +
    +  public void setNullInt(int ordinal) {
    +    setNullBit(ordinal);
    +    // put zero into the corresponding field when set null
    +    Platform.putInt(holder.buffer, getElementOffset(ordinal, 4), (int)0);
    +  }
    +
    +  public void setNullLong(int ordinal) {
    +    setNullBit(ordinal);
    +    // put zero into the corresponding field when set null
    +    Platform.putLong(holder.buffer, getElementOffset(ordinal, 8), (long)0);
    +  }
    +
    +  public void setNullFloat(int ordinal) {
    +    setNullBit(ordinal);
    +    // put zero into the corresponding field when set null
    +    Platform.putFloat(holder.buffer, getElementOffset(ordinal, 4), 
(float)0);
    +  }
    +
    +  public void setNullDouble(int ordinal) {
    +    setNullBit(ordinal);
    +    // put zero into the corresponding field when set null
    +    Platform.putDouble(holder.buffer, getElementOffset(ordinal, 8), 
(double)0);
    +  }
    +
    +  public void setNull(int ordinal) { setNullLong(ordinal); }
    +
       public void write(int ordinal, boolean value) {
    -    Platform.putBoolean(holder.buffer, holder.cursor, value);
    -    setOffset(ordinal);
    -    holder.cursor += 1;
    +    assertIndexIsValid(ordinal);
    +    Platform.putBoolean(holder.buffer, getElementOffset(ordinal, 1), 
value);
       }
     
       public void write(int ordinal, byte value) {
    -    Platform.putByte(holder.buffer, holder.cursor, value);
    -    setOffset(ordinal);
    -    holder.cursor += 1;
    +    assertIndexIsValid(ordinal);
    +    Platform.putByte(holder.buffer, getElementOffset(ordinal, 1), value);
       }
     
       public void write(int ordinal, short value) {
    -    Platform.putShort(holder.buffer, holder.cursor, value);
    -    setOffset(ordinal);
    -    holder.cursor += 2;
    +    assertIndexIsValid(ordinal);
    +    Platform.putShort(holder.buffer, getElementOffset(ordinal, 2), value);
       }
     
       public void write(int ordinal, int value) {
    -    Platform.putInt(holder.buffer, holder.cursor, value);
    -    setOffset(ordinal);
    -    holder.cursor += 4;
    +    assertIndexIsValid(ordinal);
    +    Platform.putInt(holder.buffer, getElementOffset(ordinal, 4), value);
       }
     
       public void write(int ordinal, long value) {
    -    Platform.putLong(holder.buffer, holder.cursor, value);
    -    setOffset(ordinal);
    -    holder.cursor += 8;
    +    assertIndexIsValid(ordinal);
    +    Platform.putLong(holder.buffer, getElementOffset(ordinal, 8), value);
       }
     
       public void write(int ordinal, float value) {
         if (Float.isNaN(value)) {
           value = Float.NaN;
         }
    -    Platform.putFloat(holder.buffer, holder.cursor, value);
    -    setOffset(ordinal);
    -    holder.cursor += 4;
    +    assertIndexIsValid(ordinal);
    +    Platform.putFloat(holder.buffer, getElementOffset(ordinal, 4), value);
       }
     
       public void write(int ordinal, double value) {
         if (Double.isNaN(value)) {
           value = Double.NaN;
         }
    -    Platform.putDouble(holder.buffer, holder.cursor, value);
    -    setOffset(ordinal);
    -    holder.cursor += 8;
    +    assertIndexIsValid(ordinal);
    +    Platform.putDouble(holder.buffer, getElementOffset(ordinal, 8), value);
       }
     
       public void write(int ordinal, Decimal input, int precision, int scale) {
         // make sure Decimal object has the same scale as DecimalType
    +    assertIndexIsValid(ordinal);
         if (input.changePrecision(precision, scale)) {
           if (precision <= Decimal.MAX_LONG_DIGITS()) {
    -        Platform.putLong(holder.buffer, holder.cursor, 
input.toUnscaledLong());
    -        setOffset(ordinal);
    -        holder.cursor += 8;
    +        write(ordinal, input.toUnscaledLong());
           } else {
             final byte[] bytes = 
input.toJavaBigDecimal().unscaledValue().toByteArray();
    -        assert bytes.length <= 16;
    -        holder.grow(bytes.length);
    +        final int numBytes = bytes.length;
    +        assert numBytes <= 16;
    +        int roundedSize = 
ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes);
    +        holder.grow(roundedSize);
    +
    +        zeroOutPaddingBytes(numBytes);
     
             // Write the bytes to the variable length portion.
             Platform.copyMemory(
    -          bytes, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, 
bytes.length);
    -        setOffset(ordinal);
    -        holder.cursor += bytes.length;
    +          bytes, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, 
numBytes);
    +        setOffsetAndSize(ordinal, holder.cursor, (long)numBytes);
    +
    +        // move the cursor forward with 8-bytes boundary
    +        holder.cursor += roundedSize;
           }
         } else {
    -      setNullAt(ordinal);
    +      setNull(ordinal);
         }
       }
     
       public void write(int ordinal, UTF8String input) {
         final int numBytes = input.numBytes();
    +    final int roundedSize = 
ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes);
     
         // grow the global buffer before writing data.
    -    holder.grow(numBytes);
    +    holder.grow(roundedSize);
    +
    +    zeroOutPaddingBytes(numBytes);
     
         // Write the bytes to the variable length portion.
         input.writeToMemory(holder.buffer, holder.cursor);
     
    -    setOffset(ordinal);
    +    write(ordinal, ((long)(holder.cursor - startingOffset) << 32) | 
((long) numBytes));
    --- End diff --
    
    `setOffsetAndSize`? it's not the only place, please fix all of them


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to