This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 19865b3271 chore: Finish refactor with `assert_or_internal_err!()` 
(#18790)
19865b3271 is described below

commit 19865b327182e2c4d938d5ae6b1abe81cd2e64eb
Author: Yongting You <[email protected]>
AuthorDate: Tue Nov 18 16:54:51 2025 +0800

    chore: Finish refactor with `assert_or_internal_err!()` (#18790)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Closes https://github.com/apache/datafusion/issues/18613
    
    ## Rationale for this change
    
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    #18613 is almost finished, I searched the codebase and refactor all the
    remaining patterns in this PR.
    
    Such assertion macros have been scattered to the codebase, and I have
    also added some error handling doc in
    https://github.com/apache/datafusion/pull/18762, so later we can follow
    this pattern and continue adopting those macros.
    
    ## What changes are included in this PR?
    
    <!--
    There is no need to duplicate the description in the issue here but it
    is sometimes worth providing a summary of the individual changes in this
    PR.
    -->
    
    ## Are these changes tested?
    
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    2. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    
    ## Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    -->
    
    <!--
    If there are any breaking changes to public APIs, please add the `api
    change` label.
    -->
    
    ---------
    
    Co-authored-by: Martin Grigorov <[email protected]>
---
 datafusion/catalog-listing/src/helpers.rs          |  16 +--
 datafusion/common/src/scalar/mod.rs                | 107 +++++++++------------
 datafusion/common/src/utils/mod.rs                 |   9 +-
 datafusion/core/src/physical_planner.rs            |  10 +-
 .../core/tests/parquet/external_access_plan.rs     |   5 +-
 .../core/tests/user_defined/user_defined_plan.rs   |  19 ++--
 datafusion/datasource-parquet/src/access_plan.rs   |  17 ++--
 datafusion/optimizer/src/push_down_filter.rs       |   8 +-
 datafusion/proto/src/common.rs                     |  14 +--
 datafusion/proto/src/logical_plan/mod.rs           |  13 ++-
 datafusion/pruning/src/pruning_predicate.rs        |  20 ++--
 .../spark/src/function/datetime/make_interval.rs   |  43 +++++----
 datafusion/spark/src/function/math/modulus.rs      |  18 ++--
 datafusion/spark/src/function/math/rint.rs         |   4 +-
 14 files changed, 148 insertions(+), 155 deletions(-)

diff --git a/datafusion/catalog-listing/src/helpers.rs 
b/datafusion/catalog-listing/src/helpers.rs
index 089457648d..22e7002f90 100644
--- a/datafusion/catalog-listing/src/helpers.rs
+++ b/datafusion/catalog-listing/src/helpers.rs
@@ -21,8 +21,9 @@ use std::mem;
 use std::sync::Arc;
 
 use datafusion_catalog::Session;
-use datafusion_common::internal_err;
-use datafusion_common::{HashMap, Result, ScalarValue};
+use datafusion_common::{
+    assert_or_internal_err, DataFusionError, HashMap, Result, ScalarValue,
+};
 use datafusion_datasource::ListingTableUrl;
 use datafusion_datasource::PartitionedFile;
 use datafusion_expr::{lit, utils, BinaryExpr, Operator};
@@ -386,12 +387,11 @@ pub async fn pruned_partition_list<'a>(
         .try_filter(|object_meta| futures::future::ready(object_meta.size > 
0));
 
     if partition_cols.is_empty() {
-        if !filters.is_empty() {
-            return internal_err!(
-                "Got partition filters for unpartitioned table {}",
-                table_path
-            );
-        }
+        assert_or_internal_err!(
+            filters.is_empty(),
+            "Got partition filters for unpartitioned table {}",
+            table_path
+        );
 
         // if no partition col => simply list all the files
         Ok(objects.map_ok(|object_meta| object_meta.into()).boxed())
diff --git a/datafusion/common/src/scalar/mod.rs 
b/datafusion/common/src/scalar/mod.rs
index fadd2e41ea..787bd78b1d 100644
--- a/datafusion/common/src/scalar/mod.rs
+++ b/datafusion/common/src/scalar/mod.rs
@@ -33,6 +33,7 @@ use std::mem::{size_of, size_of_val};
 use std::str::FromStr;
 use std::sync::Arc;
 
+use crate::assert_or_internal_err;
 use crate::cast::{
     as_binary_array, as_binary_view_array, as_boolean_array, as_date32_array,
     as_date64_array, as_decimal128_array, as_decimal256_array, 
as_decimal32_array,
@@ -78,8 +79,8 @@ use arrow::compute::kernels::numeric::{
 use arrow::datatypes::{
     i256, validate_decimal_precision_and_scale, ArrowDictionaryKeyType, 
ArrowNativeType,
     ArrowTimestampType, DataType, Date32Type, Decimal128Type, Decimal256Type,
-    Decimal32Type, Decimal64Type, Field, Float32Type, Int16Type, Int32Type, 
Int64Type,
-    Int8Type, IntervalDayTime, IntervalDayTimeType, IntervalMonthDayNano,
+    Decimal32Type, Decimal64Type, DecimalType, Field, Float32Type, Int16Type, 
Int32Type,
+    Int64Type, Int8Type, IntervalDayTime, IntervalDayTimeType, 
IntervalMonthDayNano,
     IntervalMonthDayNanoType, IntervalUnit, IntervalYearMonthType, TimeUnit,
     TimestampMicrosecondType, TimestampMillisecondType, 
TimestampNanosecondType,
     TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, 
UnionFields,
@@ -1578,12 +1579,10 @@ impl ScalarValue {
             DataType::Float32 => ScalarValue::Float32(Some(1.0)),
             DataType::Float64 => ScalarValue::Float64(Some(1.0)),
             DataType::Decimal32(precision, scale) => {
-                validate_decimal_precision_and_scale::<Decimal32Type>(
+                Self::validate_decimal_or_internal_err::<Decimal32Type>(
                     *precision, *scale,
                 )?;
-                if *scale < 0 {
-                    return _internal_err!("Negative scale is not supported");
-                }
+                assert_or_internal_err!(*scale >= 0, "Negative scale is not 
supported");
                 match 10_i32.checked_pow(*scale as u32) {
                     Some(value) => {
                         ScalarValue::Decimal32(Some(value), *precision, *scale)
@@ -1592,12 +1591,10 @@ impl ScalarValue {
                 }
             }
             DataType::Decimal64(precision, scale) => {
-                validate_decimal_precision_and_scale::<Decimal64Type>(
+                Self::validate_decimal_or_internal_err::<Decimal64Type>(
                     *precision, *scale,
                 )?;
-                if *scale < 0 {
-                    return _internal_err!("Negative scale is not supported");
-                }
+                assert_or_internal_err!(*scale >= 0, "Negative scale is not 
supported");
                 match i64::from(10).checked_pow(*scale as u32) {
                     Some(value) => {
                         ScalarValue::Decimal64(Some(value), *precision, *scale)
@@ -1606,12 +1603,10 @@ impl ScalarValue {
                 }
             }
             DataType::Decimal128(precision, scale) => {
-                validate_decimal_precision_and_scale::<Decimal128Type>(
+                Self::validate_decimal_or_internal_err::<Decimal128Type>(
                     *precision, *scale,
                 )?;
-                if *scale < 0 {
-                    return _internal_err!("Negative scale is not supported");
-                }
+                assert_or_internal_err!(*scale >= 0, "Negative scale is not 
supported");
                 match i128::from(10).checked_pow(*scale as u32) {
                     Some(value) => {
                         ScalarValue::Decimal128(Some(value), *precision, 
*scale)
@@ -1620,12 +1615,10 @@ impl ScalarValue {
                 }
             }
             DataType::Decimal256(precision, scale) => {
-                validate_decimal_precision_and_scale::<Decimal256Type>(
+                Self::validate_decimal_or_internal_err::<Decimal256Type>(
                     *precision, *scale,
                 )?;
-                if *scale < 0 {
-                    return _internal_err!("Negative scale is not supported");
-                }
+                assert_or_internal_err!(*scale >= 0, "Negative scale is not 
supported");
                 match i256::from(10).checked_pow(*scale as u32) {
                     Some(value) => {
                         ScalarValue::Decimal256(Some(value), *precision, 
*scale)
@@ -1652,12 +1645,10 @@ impl ScalarValue {
             DataType::Float32 => ScalarValue::Float32(Some(-1.0)),
             DataType::Float64 => ScalarValue::Float64(Some(-1.0)),
             DataType::Decimal32(precision, scale) => {
-                validate_decimal_precision_and_scale::<Decimal32Type>(
+                Self::validate_decimal_or_internal_err::<Decimal32Type>(
                     *precision, *scale,
                 )?;
-                if *scale < 0 {
-                    return _internal_err!("Negative scale is not supported");
-                }
+                assert_or_internal_err!(*scale >= 0, "Negative scale is not 
supported");
                 match 10_i32.checked_pow(*scale as u32) {
                     Some(value) => {
                         ScalarValue::Decimal32(Some(-value), *precision, 
*scale)
@@ -1666,12 +1657,10 @@ impl ScalarValue {
                 }
             }
             DataType::Decimal64(precision, scale) => {
-                validate_decimal_precision_and_scale::<Decimal64Type>(
+                Self::validate_decimal_or_internal_err::<Decimal64Type>(
                     *precision, *scale,
                 )?;
-                if *scale < 0 {
-                    return _internal_err!("Negative scale is not supported");
-                }
+                assert_or_internal_err!(*scale >= 0, "Negative scale is not 
supported");
                 match i64::from(10).checked_pow(*scale as u32) {
                     Some(value) => {
                         ScalarValue::Decimal64(Some(-value), *precision, 
*scale)
@@ -1680,12 +1669,10 @@ impl ScalarValue {
                 }
             }
             DataType::Decimal128(precision, scale) => {
-                validate_decimal_precision_and_scale::<Decimal128Type>(
+                Self::validate_decimal_or_internal_err::<Decimal128Type>(
                     *precision, *scale,
                 )?;
-                if *scale < 0 {
-                    return _internal_err!("Negative scale is not supported");
-                }
+                assert_or_internal_err!(*scale >= 0, "Negative scale is not 
supported");
                 match i128::from(10).checked_pow(*scale as u32) {
                     Some(value) => {
                         ScalarValue::Decimal128(Some(-value), *precision, 
*scale)
@@ -1694,12 +1681,10 @@ impl ScalarValue {
                 }
             }
             DataType::Decimal256(precision, scale) => {
-                validate_decimal_precision_and_scale::<Decimal256Type>(
+                Self::validate_decimal_or_internal_err::<Decimal256Type>(
                     *precision, *scale,
                 )?;
-                if *scale < 0 {
-                    return _internal_err!("Negative scale is not supported");
-                }
+                assert_or_internal_err!(*scale >= 0, "Negative scale is not 
supported");
                 match i256::from(10).checked_pow(*scale as u32) {
                     Some(value) => {
                         ScalarValue::Decimal256(Some(-value), *precision, 
*scale)
@@ -1729,14 +1714,10 @@ impl ScalarValue {
             DataType::Float32 => ScalarValue::Float32(Some(10.0)),
             DataType::Float64 => ScalarValue::Float64(Some(10.0)),
             DataType::Decimal32(precision, scale) => {
-                if let Err(err) = 
validate_decimal_precision_and_scale::<Decimal32Type>(
+                Self::validate_decimal_or_internal_err::<Decimal32Type>(
                     *precision, *scale,
-                ) {
-                    return _internal_err!("Invalid precision and scale {err}");
-                }
-                if *scale < 0 {
-                    return _internal_err!("Negative scale is not supported");
-                }
+                )?;
+                assert_or_internal_err!(*scale >= 0, "Negative scale is not 
supported");
                 match 10_i32.checked_pow((*scale + 1) as u32) {
                     Some(value) => {
                         ScalarValue::Decimal32(Some(value), *precision, *scale)
@@ -1745,14 +1726,10 @@ impl ScalarValue {
                 }
             }
             DataType::Decimal64(precision, scale) => {
-                if let Err(err) = 
validate_decimal_precision_and_scale::<Decimal64Type>(
+                Self::validate_decimal_or_internal_err::<Decimal64Type>(
                     *precision, *scale,
-                ) {
-                    return _internal_err!("Invalid precision and scale {err}");
-                }
-                if *scale < 0 {
-                    return _internal_err!("Negative scale is not supported");
-                }
+                )?;
+                assert_or_internal_err!(*scale >= 0, "Negative scale is not 
supported");
                 match i64::from(10).checked_pow((*scale + 1) as u32) {
                     Some(value) => {
                         ScalarValue::Decimal64(Some(value), *precision, *scale)
@@ -1761,14 +1738,10 @@ impl ScalarValue {
                 }
             }
             DataType::Decimal128(precision, scale) => {
-                if let Err(err) = 
validate_decimal_precision_and_scale::<Decimal128Type>(
+                Self::validate_decimal_or_internal_err::<Decimal128Type>(
                     *precision, *scale,
-                ) {
-                    return _internal_err!("Invalid precision and scale {err}");
-                }
-                if *scale < 0 {
-                    return _internal_err!("Negative scale is not supported");
-                }
+                )?;
+                assert_or_internal_err!(*scale >= 0, "Negative scale is not 
supported");
                 match i128::from(10).checked_pow((*scale + 1) as u32) {
                     Some(value) => {
                         ScalarValue::Decimal128(Some(value), *precision, 
*scale)
@@ -1777,14 +1750,10 @@ impl ScalarValue {
                 }
             }
             DataType::Decimal256(precision, scale) => {
-                if let Err(err) = 
validate_decimal_precision_and_scale::<Decimal256Type>(
+                Self::validate_decimal_or_internal_err::<Decimal256Type>(
                     *precision, *scale,
-                ) {
-                    return _internal_err!("Invalid precision and scale {err}");
-                }
-                if *scale < 0 {
-                    return _internal_err!("Negative scale is not supported");
-                }
+                )?;
+                assert_or_internal_err!(*scale >= 0, "Negative scale is not 
supported");
                 match i256::from(10).checked_pow((*scale + 1) as u32) {
                     Some(value) => {
                         ScalarValue::Decimal256(Some(value), *precision, 
*scale)
@@ -4354,6 +4323,20 @@ impl ScalarValue {
             _ => None,
         }
     }
+
+    /// A thin wrapper on Arrow's validation that throws internal error if 
validation
+    /// fails.
+    fn validate_decimal_or_internal_err<T: DecimalType>(
+        precision: u8,
+        scale: i8,
+    ) -> Result<()> {
+        validate_decimal_precision_and_scale::<T>(precision, 
scale).map_err(|err| {
+            _internal_datafusion_err!(
+                "Decimal precision/scale invariant violated \
+                 (precision={precision}, scale={scale}): {err}"
+            )
+        })
+    }
 }
 
 /// Compacts the data of an `ArrayData` into a new `ArrayData`.
diff --git a/datafusion/common/src/utils/mod.rs 
b/datafusion/common/src/utils/mod.rs
index 7b145ac3ae..6e7396a7c5 100644
--- a/datafusion/common/src/utils/mod.rs
+++ b/datafusion/common/src/utils/mod.rs
@@ -22,8 +22,9 @@ pub mod memory;
 pub mod proxy;
 pub mod string_utils;
 
-use crate::error::{_exec_datafusion_err, _internal_datafusion_err, 
_internal_err};
-use crate::{Result, ScalarValue};
+use crate::assert_or_internal_err;
+use crate::error::{_exec_datafusion_err, _internal_datafusion_err};
+use crate::{DataFusionError, Result, ScalarValue};
 use arrow::array::{
     cast::AsArray, Array, ArrayRef, FixedSizeListArray, LargeListArray, 
ListArray,
     OffsetSizeTrait,
@@ -519,9 +520,7 @@ pub fn arrays_into_list_array(
     arr: impl IntoIterator<Item = ArrayRef>,
 ) -> Result<ListArray> {
     let arr = arr.into_iter().collect::<Vec<_>>();
-    if arr.is_empty() {
-        return _internal_err!("Cannot wrap empty array into list array");
-    }
+    assert_or_internal_err!(!arr.is_empty(), "Cannot wrap empty array into 
list array");
 
     let lens = arr.iter().map(|x| x.len()).collect::<Vec<_>>();
     // Assume data type is consistent
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 6bcc45e9e4..914d411ad6 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1757,11 +1757,11 @@ fn qualify_join_schema_sides(
     let join_fields = join_schema.fields();
 
     // Validate lengths
-    if join_fields.len() != left_fields.len() + right_fields.len() {
-        return internal_err!(
-            "Join schema field count must match left and right field count."
-        );
-    }
+    assert_eq_or_internal_err!(
+        join_fields.len(),
+        left_fields.len() + right_fields.len(),
+        "Join schema field count must match left and right field count."
+    );
 
     // Validate field names match
     for (i, (field, expected)) in join_fields
diff --git a/datafusion/core/tests/parquet/external_access_plan.rs 
b/datafusion/core/tests/parquet/external_access_plan.rs
index b35cb6e09c..9022b25356 100644
--- a/datafusion/core/tests/parquet/external_access_plan.rs
+++ b/datafusion/core/tests/parquet/external_access_plan.rs
@@ -257,7 +257,10 @@ async fn bad_selection() {
     .await
     .unwrap_err();
     let err_string = err.to_string();
-    assert_contains!(&err_string, "Internal error: Invalid ParquetAccessPlan 
Selection. Row group 0 has 5 rows but selection only specifies 4 rows");
+    assert_contains!(
+        &err_string,
+        "Row group 0 has 5 rows but selection only specifies 4 rows."
+    );
 }
 
 /// Return a RowSelection of 1 rows from a row group of 5 rows
diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs 
b/datafusion/core/tests/user_defined/user_defined_plan.rs
index ffe0ba021e..2d0e02719c 100644
--- a/datafusion/core/tests/user_defined/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined/user_defined_plan.rs
@@ -70,7 +70,7 @@ use arrow::{
 use datafusion::execution::session_state::SessionStateBuilder;
 use datafusion::{
     common::cast::as_int64_array,
-    common::{arrow_datafusion_err, internal_err, DFSchemaRef},
+    common::{arrow_datafusion_err, DFSchemaRef},
     error::{DataFusionError, Result},
     execution::{
         context::{QueryPlanner, SessionState, TaskContext},
@@ -91,7 +91,7 @@ use datafusion::{
 };
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
-use datafusion_common::ScalarValue;
+use datafusion_common::{assert_eq_or_internal_err, assert_or_internal_err, 
ScalarValue};
 use datafusion_expr::{FetchType, InvariantLevel, Projection, SortExpr};
 use datafusion_optimizer::optimizer::ApplyOrder;
 use datafusion_optimizer::AnalyzerRule;
@@ -585,9 +585,10 @@ impl UserDefinedLogicalNodeCore for TopKPlanNode {
             kind,
         }) = self.invariant_mock.clone()
         {
-            if should_fail_invariant && check == kind {
-                return internal_err!("node fails check, such as improper 
inputs");
-            }
+            assert_or_internal_err!(
+                !(should_fail_invariant && check == kind),
+                "node fails check, such as improper inputs"
+            );
         }
         Ok(())
     }
@@ -733,9 +734,11 @@ impl ExecutionPlan for TopKExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        if 0 != partition {
-            return internal_err!("TopKExec invalid partition {partition}");
-        }
+        assert_eq_or_internal_err!(
+            partition,
+            0,
+            "TopKExec invalid partition {partition}"
+        );
 
         Ok(Box::pin(TopKReader {
             input: self.input.execute(partition, context)?,
diff --git a/datafusion/datasource-parquet/src/access_plan.rs 
b/datafusion/datasource-parquet/src/access_plan.rs
index 0c30f3ff85..295ecea946 100644
--- a/datafusion/datasource-parquet/src/access_plan.rs
+++ b/datafusion/datasource-parquet/src/access_plan.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use datafusion_common::{internal_err, Result};
+use datafusion_common::{assert_eq_or_internal_err, DataFusionError, Result};
 use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
 use parquet::file::metadata::RowGroupMetaData;
 
@@ -269,13 +269,13 @@ impl ParquetAccessPlan {
                 .sum::<usize>();
 
             let row_group_row_count = rg_meta.num_rows();
-            if rows_in_selection as i64 != row_group_row_count {
-                return internal_err!(
-                    "Invalid ParquetAccessPlan Selection. Row group {idx} has 
{row_group_row_count} rows \
+            assert_eq_or_internal_err!(
+                rows_in_selection as i64,
+                row_group_row_count,
+                "Invalid ParquetAccessPlan Selection. Row group {idx} has 
{row_group_row_count} rows \
                     but selection only specifies {rows_in_selection} rows. \
                     Selection: {selection:?}"
-                );
-            }
+            );
         }
 
         let total_selection: RowSelection = self
@@ -482,7 +482,10 @@ mod test {
             .unwrap_err()
             .to_string();
         assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
-        assert_contains!(err, "Internal error: Invalid ParquetAccessPlan 
Selection. Row group 1 has 20 rows but selection only specifies 12 rows");
+        assert_contains!(
+            err,
+            "Row group 1 has 20 rows but selection only specifies 12 rows"
+        );
     }
 
     #[test]
diff --git a/datafusion/optimizer/src/push_down_filter.rs 
b/datafusion/optimizer/src/push_down_filter.rs
index c8904a55c2..a38cd7a75b 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -28,8 +28,8 @@ use datafusion_common::tree_node::{
     Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
 };
 use datafusion_common::{
-    assert_eq_or_internal_err, internal_err, plan_err, qualified_name, Column, 
DFSchema,
-    DataFusionError, Result,
+    assert_eq_or_internal_err, assert_or_internal_err, internal_err, plan_err,
+    qualified_name, Column, DFSchema, DataFusionError, Result,
 };
 use datafusion_expr::expr::WindowFunction;
 use datafusion_expr::expr_rewriter::replace_col;
@@ -1372,9 +1372,7 @@ fn insert_below(
     })?;
 
     // make sure we did the actual replacement
-    if new_child.is_some() {
-        return internal_err!("node had no  inputs");
-    }
+    assert_or_internal_err!(new_child.is_none(), "node had no inputs");
 
     Ok(transformed_plan)
 }
diff --git a/datafusion/proto/src/common.rs b/datafusion/proto/src/common.rs
index 2aa12dd350..da4cd942cc 100644
--- a/datafusion/proto/src/common.rs
+++ b/datafusion/proto/src/common.rs
@@ -15,14 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use datafusion_common::{internal_datafusion_err, internal_err, Result};
+use datafusion_common::{
+    assert_eq_or_internal_err, internal_datafusion_err, DataFusionError, 
Result,
+};
 
 pub(crate) fn str_to_byte(s: &String, description: &str) -> Result<u8> {
-    if s.len() != 1 {
-        return internal_err!(
-            "Invalid CSV {description}: expected single character, got {s}"
-        );
-    }
+    assert_eq_or_internal_err!(
+        s.len(),
+        1,
+        "Invalid CSV {description}: expected single character, got {s}"
+    );
     Ok(s.as_bytes()[0])
 }
 
diff --git a/datafusion/proto/src/logical_plan/mod.rs 
b/datafusion/proto/src/logical_plan/mod.rs
index 9644c9f69f..84d6688b78 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -37,8 +37,8 @@ use arrow::datatypes::{DataType, Field, Schema, 
SchemaBuilder, SchemaRef};
 use datafusion_catalog::cte_worktable::CteWorkTable;
 use datafusion_common::file_options::file_type::FileType;
 use datafusion_common::{
-    context, internal_datafusion_err, internal_err, not_impl_err, plan_err, 
Result,
-    TableReference, ToDFSchema,
+    assert_or_internal_err, context, internal_datafusion_err, internal_err, 
not_impl_err,
+    plan_err, DataFusionError, Result, TableReference, ToDFSchema,
 };
 use datafusion_datasource::file_format::FileFormat;
 use datafusion_datasource::file_format::{
@@ -776,11 +776,10 @@ impl AsLogicalPlan for LogicalPlanNode {
                 builder.build()
             }
             LogicalPlanType::Union(union) => {
-                if union.inputs.len() < 2 {
-                    return internal_err!(
-                        "Protobuf deserialization error, Union was require at 
least two input."
-                    );
-                }
+                assert_or_internal_err!(
+                    union.inputs.len() >= 2,
+                    "Protobuf deserialization error, Union requires at least 
two inputs."
+                );
                 let (first, rest) = union.inputs.split_first().unwrap();
                 let mut builder = LogicalPlanBuilder::from(
                     first.try_into_logical_plan(ctx, extension_codec)?,
diff --git a/datafusion/pruning/src/pruning_predicate.rs 
b/datafusion/pruning/src/pruning_predicate.rs
index d0c8b52fc4..527a0e0946 100644
--- a/datafusion/pruning/src/pruning_predicate.rs
+++ b/datafusion/pruning/src/pruning_predicate.rs
@@ -36,12 +36,12 @@ use log::{debug, trace};
 
 use datafusion_common::error::Result;
 use datafusion_common::tree_node::TransformedResult;
+use datafusion_common::{assert_eq_or_internal_err, Column, DFSchema};
 use datafusion_common::{
-    internal_datafusion_err, internal_err, plan_datafusion_err, plan_err,
+    internal_datafusion_err, plan_datafusion_err, plan_err,
     tree_node::{Transformed, TreeNode},
-    ScalarValue,
+    DataFusionError, ScalarValue,
 };
-use datafusion_common::{Column, DFSchema};
 use datafusion_expr_common::operator::Operator;
 use datafusion_physical_expr::utils::{collect_columns, Guarantee, 
LiteralGuarantee};
 use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};
@@ -919,13 +919,13 @@ fn build_statistics_record_batch<S: PruningStatistics + 
?Sized>(
         };
         let array = array.unwrap_or_else(|| new_null_array(data_type, 
num_containers));
 
-        if num_containers != array.len() {
-            return internal_err!(
-                "mismatched statistics length. Expected {}, got {}",
-                num_containers,
-                array.len()
-            );
-        }
+        assert_eq_or_internal_err!(
+            num_containers,
+            array.len(),
+            "mismatched statistics length. Expected {}, got {}",
+            num_containers,
+            array.len()
+        );
 
         // cast statistics array to required data type (e.g. parquet
         // provides timestamp statistics as "Int64")
diff --git a/datafusion/spark/src/function/datetime/make_interval.rs 
b/datafusion/spark/src/function/datetime/make_interval.rs
index 8e3169556b..d510eacb9a 100644
--- a/datafusion/spark/src/function/datetime/make_interval.rs
+++ b/datafusion/spark/src/function/datetime/make_interval.rs
@@ -238,7 +238,9 @@ mod tests {
     use arrow::array::{Float64Array, Int32Array, IntervalMonthDayNanoArray};
     use arrow::datatypes::Field;
     use datafusion_common::config::ConfigOptions;
-    use datafusion_common::{internal_datafusion_err, internal_err, Result};
+    use datafusion_common::{
+        assert_eq_or_internal_err, internal_datafusion_err, internal_err, 
Result,
+    };
 
     use super::*;
     fn run_make_interval_month_day_nano(arrs: Vec<ArrayRef>) -> 
Result<ArrayRef> {
@@ -533,34 +535,33 @@ mod tests {
                     .ok_or_else(|| {
                         internal_datafusion_err!("expected 
IntervalMonthDayNanoArray")
                     })?;
-                if arr.len() != number_rows {
-                    return internal_err!(
-                        "expected array length {number_rows}, got {}",
-                        arr.len()
-                    );
-                }
+                assert_eq_or_internal_err!(
+                    arr.len(),
+                    number_rows,
+                    "expected array length {number_rows}"
+                );
                 for i in 0..number_rows {
                     let iv = arr.value(i);
-                    if (iv.months, iv.days, iv.nanoseconds) != (0, 0, 0) {
-                        return internal_err!(
-                            "row {i}: expected (0,0,0), got ({},{},{})",
-                            iv.months,
-                            iv.days,
-                            iv.nanoseconds
-                        );
-                    }
-                }
-            }
-            ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(iv))) 
=> {
-                if (iv.months, iv.days, iv.nanoseconds) != (0, 0, 0) {
-                    return internal_err!(
-                        "expected scalar 0s, got ({},{},{})",
+                    assert_eq_or_internal_err!(
+                        (iv.months, iv.days, iv.nanoseconds),
+                        (0, 0, 0),
+                        "row {i}: expected (0,0,0), got ({},{},{})",
                         iv.months,
                         iv.days,
                         iv.nanoseconds
                     );
                 }
             }
+            ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(iv))) 
=> {
+                assert_eq_or_internal_err!(
+                    (iv.months, iv.days, iv.nanoseconds),
+                    (0, 0, 0),
+                    "expected scalar 0s, got ({},{},{})",
+                    iv.months,
+                    iv.days,
+                    iv.nanoseconds
+                );
+            }
             other => {
                 return internal_err!(
                     "expected Array or Scalar IntervalMonthDayNano, got 
{other:?}"
diff --git a/datafusion/spark/src/function/math/modulus.rs 
b/datafusion/spark/src/function/math/modulus.rs
index b894c8cad5..aa66b179e2 100644
--- a/datafusion/spark/src/function/math/modulus.rs
+++ b/datafusion/spark/src/function/math/modulus.rs
@@ -19,7 +19,7 @@ use arrow::compute::kernels::numeric::add;
 use arrow::compute::kernels::{cmp::lt, numeric::rem, zip::zip};
 use arrow::datatypes::DataType;
 use datafusion_common::{
-    assert_eq_or_internal_err, internal_err, DataFusionError, Result, 
ScalarValue,
+    assert_eq_or_internal_err, DataFusionError, Result, ScalarValue,
 };
 use datafusion_expr::{
     ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
@@ -85,9 +85,11 @@ impl ScalarUDFImpl for SparkMod {
     }
 
     fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
-        if arg_types.len() != 2 {
-            return internal_err!("mod expects exactly two arguments");
-        }
+        assert_eq_or_internal_err!(
+            arg_types.len(),
+            2,
+            "mod expects exactly two arguments"
+        );
 
         // Return the same type as the first argument for simplicity
         // Arrow's rem function handles type promotion internally
@@ -133,9 +135,11 @@ impl ScalarUDFImpl for SparkPmod {
     }
 
     fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
-        if arg_types.len() != 2 {
-            return internal_err!("pmod expects exactly two arguments");
-        }
+        assert_eq_or_internal_err!(
+            arg_types.len(),
+            2,
+            "pmod expects exactly two arguments"
+        );
 
         // Return the same type as the first argument for simplicity
         // Arrow's rem function handles type promotion internally
diff --git a/datafusion/spark/src/function/math/rint.rs 
b/datafusion/spark/src/function/math/rint.rs
index aae5455df0..40dac9cb31 100644
--- a/datafusion/spark/src/function/math/rint.rs
+++ b/datafusion/spark/src/function/math/rint.rs
@@ -84,9 +84,7 @@ impl ScalarUDFImpl for SparkRint {
 }
 
 pub fn spark_rint(args: &[ArrayRef]) -> Result<ArrayRef> {
-    if args.len() != 1 {
-        assert_eq_or_internal_err!(args.len(), 1, "`rint` expects exactly one 
argument");
-    }
+    assert_eq_or_internal_err!(args.len(), 1, "`rint` expects exactly one 
argument");
 
     let array: &dyn Array = args[0].as_ref();
     match args[0].data_type() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to