Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21118 @cloud-fan and @jose-torres: I looked at `explain codegen` for reading from a Parquet table (with vectorized reads disabled) and it doesn't look like there is a dependency on `UnsafeRow`: ``` explain codegen select * from test ``` ``` Found 1 WholeStageCodegen subtrees. == Subtree 1 / 1 == *FileScan parquet rblue.test[id#40L,data#41] Batched: false, Format: Parquet, Location: InMemoryFileIndex[s3://bucket/warehouse/blue.db/test/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,data:string> Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows; /* 009 */ private scala.collection.Iterator scan_input; /* 010 */ /* 011 */ public GeneratedIterator(Object[] references) { /* 012 */ this.references = references; /* 013 */ } /* 014 */ /* 015 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 016 */ partitionIndex = index; /* 017 */ this.inputs = inputs; /* 018 */ this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0]; /* 019 */ scan_input = inputs[0]; /* 020 */ /* 021 */ } /* 022 */ /* 023 */ protected void processNext() throws java.io.IOException { /* 024 */ while (scan_input.hasNext()) { /* 025 */ InternalRow scan_row = (InternalRow) scan_input.next(); /* 026 */ scan_numOutputRows.add(1); /* 027 */ append(scan_row); /* 028 */ if (shouldStop()) return; /* 029 */ } /* 030 */ } /* 031 */ } ``` I've looked at a few simple queries with filters, projects, and aggregation and it doesn't look like any of the generated code depends on `UnsafeRow`. Can anyone confirm that it is not a requirement to pass `UnsafeRow` into generated code? If there is no requirement for the rows to be `UnsafeRow`, then is it necessary to add an `UnsafeProjection` or would the copy to unsafe make execution slower? If the rows passed from the data source are `UnsafeRow`, then `UnsafeProjection` detects it and copies the row buffer (see [examples](https://github.com/Netflix/iceberg/blob/parquet-value-readers/spark/src/test/java/com/netflix/iceberg/spark/data/CodegenExamples.java#L256-L262)). That's faster than copying individual values, but slower than just using the row as-is. Not adding a projection would make this case faster. If the rows passed from the data source are `InternalRow` and not `UnsafeRow`, then we *could* copy them immediately, but it is very likely that the data is already going to be copied. A projection, for example, immediately copies all of the data out of the `UnsafeRow` and an initial copy to unsafe is just extra work. Similarly, a filter is probably selective enough that it makes sense to wait until after the filter runs to copy the entire row of data to unsafe. In all of the cases that I've looked at, a copy to unsafe would only slow down execution. Unsafe rows may have better cache locality, but the copy reads *all* of the data anyway. If I'm right and we do not need to insert a copy to `UnsafeRow`, then we don't need the `SupportsScanUnsafeRow` trait. Data sources can still produce `UnsafeRow` and it would work without a problem. The only time we need to know that an `InternalRow` is actually an `UnsafeRow` is if we are adding a projection to unsafe, in which case we could avoid a copy of the row buffer.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org