This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new de9f425f8 fix: [native_scans] Support `CASE_SENSITIVE` when reading Parquet (#1782) de9f425f8 is described below commit de9f425f8f3c7af1d837732bd315d27d7ec1edf3 Author: Andy Grove <agr...@apache.org> AuthorDate: Tue May 27 08:07:51 2025 -0600 fix: [native_scans] Support `CASE_SENSITIVE` when reading Parquet (#1782) --- common/src/main/java/org/apache/comet/parquet/Native.java | 3 ++- .../java/org/apache/comet/parquet/NativeBatchReader.java | 8 +++++++- native/core/src/execution/planner.rs | 1 + native/core/src/parquet/mod.rs | 4 +++- native/core/src/parquet/parquet_exec.rs | 11 ++++++++--- native/core/src/parquet/parquet_support.rs | 15 ++++++++++++--- native/proto/src/proto/operator.proto | 1 + .../scala/org/apache/comet/serde/QueryPlanSerde.scala | 1 + .../org/apache/comet/exec/CometNativeReaderSuite.scala | 14 ++++++++++++++ 9 files changed, 49 insertions(+), 9 deletions(-) diff --git a/common/src/main/java/org/apache/comet/parquet/Native.java b/common/src/main/java/org/apache/comet/parquet/Native.java index d4056c9ed..9070487ff 100644 --- a/common/src/main/java/org/apache/comet/parquet/Native.java +++ b/common/src/main/java/org/apache/comet/parquet/Native.java @@ -257,7 +257,8 @@ public final class Native extends NativeBase { byte[] requiredSchema, byte[] dataSchema, String sessionTimezone, - int batchSize); + int batchSize, + boolean caseSensitive); // arrow native version of read batch /** diff --git a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java index 8865e6f33..7a6a1d714 100644 --- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java @@ -65,6 +65,7 @@ import org.apache.spark.sql.execution.datasources.PartitionedFile; import org.apache.spark.sql.execution.datasources.parquet.ParquetColumn; import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter; import org.apache.spark.sql.execution.metric.SQLMetric; +import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -405,6 +406,10 @@ public class NativeBatchReader extends RecordReader<Void, ColumnarBatch> impleme conf.getInt( CometConf.COMET_BATCH_SIZE().key(), (Integer) CometConf.COMET_BATCH_SIZE().defaultValue().get()); + boolean caseSensitive = + conf.getBoolean( + SQLConf.CASE_SENSITIVE().key(), + (boolean) SQLConf.CASE_SENSITIVE().defaultValue().get()); this.handle = Native.initRecordBatchReader( filePath, @@ -415,7 +420,8 @@ public class NativeBatchReader extends RecordReader<Void, ColumnarBatch> impleme serializedRequestedArrowSchema, serializedDataArrowSchema, timeZoneId, - batchSize); + batchSize, + caseSensitive); } isInitialized = true; } diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 202c187d3..60587a6fb 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1190,6 +1190,7 @@ impl PhysicalPlanner { Some(data_filters?), default_values, scan.session_timezone.as_str(), + scan.case_sensitive, )?; Ok(( vec![], diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index 54fe23f59..b24591e9d 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -60,7 +60,7 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::{SessionConfig, SessionContext}; use futures::{poll, StreamExt}; use jni::objects::{JBooleanArray, JByteArray, JLongArray, JPrimitiveArray, JString, ReleaseMode}; -use jni::sys::jstring; +use jni::sys::{jstring, JNI_FALSE}; use object_store::path::Path; use read::ColumnReader; use util::jni::{convert_column_descriptor, convert_encoding, deserialize_schema}; @@ -657,6 +657,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat data_schema: jbyteArray, session_timezone: jstring, batch_size: jint, + case_sensitive: jboolean, ) -> jlong { try_unwrap_or_throw(&e, |mut env| unsafe { let session_config = SessionConfig::new().with_batch_size(batch_size as usize); @@ -717,6 +718,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat data_filters, None, session_timezone.as_str(), + case_sensitive != JNI_FALSE, )?; let partition_index: usize = 0; diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index f655f9157..13961ebf9 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -65,8 +65,10 @@ pub(crate) fn init_datasource_exec( data_filters: Option<Vec<Arc<dyn PhysicalExpr>>>, default_values: Option<HashMap<usize, ScalarValue>>, session_timezone: &str, + case_sensitive: bool, ) -> Result<Arc<DataSourceExec>, ExecutionError> { - let (table_parquet_options, spark_parquet_options) = get_options(session_timezone); + let (table_parquet_options, spark_parquet_options) = + get_options(session_timezone, case_sensitive); let mut parquet_source = ParquetSource::new(table_parquet_options).with_schema_adapter_factory(Arc::new( SparkSchemaAdapterFactory::new(spark_parquet_options, default_values), @@ -118,7 +120,10 @@ pub(crate) fn init_datasource_exec( Ok(Arc::new(DataSourceExec::new(Arc::new(file_scan_config)))) } -fn get_options(session_timezone: &str) -> (TableParquetOptions, SparkParquetOptions) { +fn get_options( + session_timezone: &str, + case_sensitive: bool, +) -> (TableParquetOptions, SparkParquetOptions) { let mut table_parquet_options = TableParquetOptions::new(); table_parquet_options.global.pushdown_filters = true; table_parquet_options.global.reorder_filters = true; @@ -126,7 +131,7 @@ fn get_options(session_timezone: &str) -> (TableParquetOptions, SparkParquetOpti let mut spark_parquet_options = SparkParquetOptions::new(EvalMode::Legacy, session_timezone, false); spark_parquet_options.allow_cast_unsigned_ints = true; - spark_parquet_options.case_sensitive = false; + spark_parquet_options.case_sensitive = case_sensitive; (table_parquet_options, spark_parquet_options) } diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 0a41efcec..4067afaea 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -211,7 +211,11 @@ fn cast_struct_to_struct( // TODO some of this logic may be specific to converting Parquet to Spark let mut field_name_to_index_map = HashMap::new(); for (i, field) in from_fields.iter().enumerate() { - field_name_to_index_map.insert(field.name(), i); + if parquet_options.case_sensitive { + field_name_to_index_map.insert(field.name().clone(), i); + } else { + field_name_to_index_map.insert(field.name().to_lowercase(), i); + } } assert_eq!(field_name_to_index_map.len(), from_fields.len()); let mut cast_fields: Vec<ArrayRef> = Vec::with_capacity(to_fields.len()); @@ -219,8 +223,13 @@ fn cast_struct_to_struct( // Fields in the to_type schema may not exist in the from_type schema // i.e. the required schema may have fields that the file does not // have - if field_name_to_index_map.contains_key(to_fields[i].name()) { - let from_index = field_name_to_index_map[to_fields[i].name()]; + let key = if parquet_options.case_sensitive { + to_fields[i].name().clone() + } else { + to_fields[i].name().to_lowercase() + }; + if field_name_to_index_map.contains_key(&key) { + let from_index = field_name_to_index_map[&key]; let cast_field = cast_array( Arc::clone(array.column(from_index)), to_fields[i].data_type(), diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index e16cdb212..9a41f977a 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -93,6 +93,7 @@ message NativeScan { string session_timezone = 9; repeated spark.spark_expression.Expr default_values = 10; repeated int64 default_values_indexes = 11; + bool case_sensitive = 12; } message Projection { diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 1da3e984b..a98234585 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2370,6 +2370,7 @@ object QueryPlanSerde extends Logging with CometExprShim { nativeScanBuilder.addAllRequiredSchema(requiredSchema.toIterable.asJava) nativeScanBuilder.addAllPartitionSchema(partitionSchema.toIterable.asJava) nativeScanBuilder.setSessionTimezone(conf.getConfString("spark.sql.session.timeZone")) + nativeScanBuilder.setCaseSensitive(conf.getConf[Boolean](SQLConf.CASE_SENSITIVE)) Some(result.setNativeScan(nativeScanBuilder).build()) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala index 4115ba432..f8f13f9ad 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala @@ -44,6 +44,20 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper }) } + test("native reader case sensitivity") { + withTempPath { path => + spark.range(10).toDF("a").write.parquet(path.toString) + Seq(true, false).foreach { caseSensitive => + withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { + val tbl = s"case_sensitivity_${caseSensitive}_${System.currentTimeMillis()}" + sql(s"create table $tbl (A long) using parquet options (path '" + path + "')") + val df = sql(s"select A from $tbl") + checkSparkAnswer(df) + } + } + } + } + test("native reader - read simple STRUCT fields") { testSingleLineQuery( """ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org