Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1058#discussion_r154475350
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java 
---
    @@ -62,27 +72,65 @@ public Writer write(VectorAccessible va) throws 
IOException {
     
         @SuppressWarnings("resource")
         public Writer write(VectorAccessible va, SelectionVector2 sv2) throws 
IOException {
    +      checkNotNull(va);
           WritableBatch batch = WritableBatch.getBatchNoHVWrap(
               va.getRecordCount(), va, sv2 != null);
           return write(batch, sv2);
         }
     
         public Writer write(WritableBatch batch, SelectionVector2 sv2) throws 
IOException {
    -      VectorAccessibleSerializable vas;
    -      if (sv2 == null) {
    -        vas = new VectorAccessibleSerializable(batch, allocator);
    -      } else {
    -        vas = new VectorAccessibleSerializable(batch, sv2, allocator);
    -      }
    -      if (retain) {
    -        vas.writeToStreamAndRetain(stream);
    -      } else {
    -        vas.writeToStream(stream);
    +      return write(batch, sv2, false);
    +    }
    +
    +    public Writer write(WritableBatch batch, SelectionVector2 sv2, boolean 
retain) throws IOException {
    +      checkNotNull(batch);
    +      checkNotNull(channel);
    +      final Timer.Context timerContext = 
metrics.timer(WRITER_TIMER).time();
    +
    +      final DrillBuf[] incomingBuffers = batch.getBuffers();
    +      final UserBitShared.RecordBatchDef batchDef = batch.getDef();
    +
    +      try {
    +        /* Write the metadata to the file */
    +        batchDef.writeDelimitedTo(output);
    +
    +
    +        /* If we have a selection vector, dump it to file first */
    +        if (sv2 != null) {
    +          final int dataLength = sv2.getCount() * 
SelectionVector2.RECORD_SIZE;
    +          channel.write(sv2.getBuffer(false).nioBuffer(0, dataLength));
    +        }
    +
    +        /* Dump the array of ByteBuf's associated with the value vectors */
    +        for (DrillBuf buf : incomingBuffers) {
    +          /* dump the buffer into the OutputStream */
    +          channel.write(buf.nioBuffer());
    --- End diff --
    
    While this technically works, it is a bit inefficient. Above, we write only 
the portion of the SV2 buffer that is actually used. But, here we grabbed 
buffers generically. Unless the write positions were set correctly for each 
buffer, we don't know the amount of the buffer that contains data. We may be 
writing the whole buffer, including unused internally-fragmented space.
    
    If the write position is used to limit the write size, than the buffer 
write position should work just as well for the SV2, so we would not need the 
manual computation of used buffer size.
    
    Perhaps the old code worked this way. But, I wonder if, since we are 
looking for greater performance, we should handle the issue here. (At present, 
Parquet produces batches with, say, 5% data and 95% unused. Do we want to spill 
all that space?)
    
    Plus, since we spill the extra data, and we do power-of-two rounding on 
read, this probably explains why the sort must allow 2x the amount of memory 
for a deserialized spill batch than seems necessary from the sum of the data 
blocks.


---

Reply via email to