andygrove commented on code in PR #1192: URL: https://github.com/apache/datafusion-comet/pull/1192#discussion_r1901370341
########## native/core/src/execution/shuffle/shuffle_writer.rs: ########## @@ -1567,17 +1585,41 @@ pub fn write_ipc_compressed<W: Write + Seek>( let mut timer = ipc_time.timer(); let start_pos = output.stream_position()?; - // write ipc_length placeholder - output.write_all(&[0u8; 8])?; + // seek past ipc_length placeholder + output.seek_relative(8)?; + + // write number of columns because JVM side needs to know how many addresses to allocate + let field_count = batch.schema().fields().len(); + output.write_all(&field_count.to_le_bytes())?; let output = match codec { CompressionCodec::None => { + output.write_all(b"NONE")?; let mut arrow_writer = StreamWriter::try_new(output, &batch.schema())?; arrow_writer.write(batch)?; arrow_writer.finish()?; arrow_writer.into_inner()? } + CompressionCodec::Snappy => { + output.write_all(b"SNAP")?; + let mut wtr = snap::write::FrameEncoder::new(output); + let mut arrow_writer = StreamWriter::try_new(&mut wtr, &batch.schema())?; + arrow_writer.write(batch)?; + arrow_writer.finish()?; + wtr.into_inner() + .map_err(|e| DataFusionError::Execution(format!("lz4 compression error: {}", e)))? + } + CompressionCodec::Lz4Frame => { + output.write_all(b"LZ4_")?; + let mut wtr = lz4_flex::frame::FrameEncoder::new(output); + let mut arrow_writer = StreamWriter::try_new(&mut wtr, &batch.schema())?; Review Comment: Feel free to experiment, and let's see what else we can learn. I'm not sure exactly what you are planning on trying but it seems that we could just have one writer per `PartitionBuffer` and write multiple batches to it. It may complicate the spill logic but you can just disable that for testing the idea. I wonder how this would differ from just increasing the shuffle batch size so that we write larger batches? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org