alamb commented on code in PR #8143:
URL: https://github.com/apache/arrow-datafusion/pull/8143#discussion_r1393350639
##########
datafusion/common/src/dfschema.rs:
##########
@@ -427,48 +443,16 @@ impl DFSchema {
self_fields.zip(other_fields).all(|(f1, f2)| {
f1.qualifier() == f2.qualifier()
&& f1.name() == f2.name()
- && Self::datatype_is_semantically_equal(f1.data_type(),
f2.data_type())
+ && Self::datatype_is_logically_equal(f1.data_type(),
f2.data_type())
})
}
/// Checks if two [`DataType`]s are logically equal. This is a notably
weaker constraint
/// than datatype_is_semantically_equal in that a Dictionary<K,V> type is
logically
/// equal to a plain V type, but not semantically equal. Dictionary<K1,
V1> is also
/// logically equal to Dictionary<K2, V1>.
- fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool {
- // check nested fields
- match (dt1, dt2) {
- (DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => {
- v1.as_ref() == v2.as_ref()
- }
- (DataType::Dictionary(_, v1), othertype) => v1.as_ref() ==
othertype,
- (othertype, DataType::Dictionary(_, v1)) => v1.as_ref() ==
othertype,
- (DataType::List(f1), DataType::List(f2))
- | (DataType::LargeList(f1), DataType::LargeList(f2))
- | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _))
- | (DataType::Map(f1, _), DataType::Map(f2, _)) => {
- Self::field_is_logically_equal(f1, f2)
- }
- (DataType::Struct(fields1), DataType::Struct(fields2)) => {
- let iter1 = fields1.iter();
- let iter2 = fields2.iter();
- fields1.len() == fields2.len() &&
- // all fields have to be the same
- iter1
- .zip(iter2)
- .all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
- }
- (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
- let iter1 = fields1.iter();
- let iter2 = fields2.iter();
- fields1.len() == fields2.len() &&
- // all fields have to be the same
- iter1
- .zip(iter2)
- .all(|((t1, f1), (t2, f2))| t1 == t2 &&
Self::field_is_logically_equal(f1, f2))
- }
- _ => dt1 == dt2,
- }
+ fn datatype_is_logically_equal(dt1: &LogicalType, dt2: &LogicalType) ->
bool {
+ dt1 == dt2
Review Comment:
that is certainly nicer
##########
datafusion/common/src/logical_type.rs:
##########
@@ -0,0 +1,411 @@
+use std::{borrow::Cow, fmt::Display, sync::Arc};
+
+use crate::error::Result;
+use arrow_schema::{DataType, Field, IntervalUnit, TimeUnit};
+
+#[derive(Clone, Debug)]
+pub enum LogicalType {
Review Comment:
Eventually we should add doc comments here, but it also makes sense to avoid
over doing it on RFC / draft.
##########
datafusion/common/src/logical_type.rs:
##########
@@ -0,0 +1,411 @@
+use std::{borrow::Cow, fmt::Display, sync::Arc};
+
+use crate::error::Result;
+use arrow_schema::{DataType, Field, IntervalUnit, TimeUnit};
+
+#[derive(Clone, Debug)]
+pub enum LogicalType {
+ Null,
+ Boolean,
+ Int8,
+ Int16,
+ Int32,
+ Int64,
+ UInt8,
+ UInt16,
+ UInt32,
+ UInt64,
+ Float16,
+ Float32,
+ Float64,
+ String,
+ LargeString,
+ Date32,
+ Date64,
+ Time32(TimeUnit),
+ Time64(TimeUnit),
+ Timestamp(TimeUnit, Option<Arc<str>>),
+ Duration(TimeUnit),
+ Interval(IntervalUnit),
+ Binary,
+ FixedSizeBinary(i32),
+ LargeBinary,
+ Utf8,
+ LargeUtf8,
+ List(Box<LogicalType>),
+ FixedSizeList(Box<LogicalType>, i32),
+ LargeList(Box<LogicalType>),
+ Struct(Fields),
+ Map(NamedLogicalTypeRef, bool),
+ // union
+ Decimal128(u8, i8),
+ Decimal256(u8, i8),
+ Extension(ExtensionTypeRef),
+}
+
+impl PartialEq for LogicalType {
+ fn eq(&self, other: &Self) -> bool {
+ self.type_signature() == other.type_signature()
+ }
+}
+
+impl Eq for LogicalType {}
+
+impl std::hash::Hash for LogicalType {
+ fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+ self.type_signature().hash(state)
+ }
+}
+
+impl Display for LogicalType {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{}", self.display_name())
+ }
+}
+
+pub type Fields = Arc<[NamedLogicalTypeRef]>;
+pub type NamedLogicalTypeRef = Arc<NamedLogicalType>;
+
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub struct NamedLogicalType {
+ name: String,
+ data_type: LogicalType,
+}
+
+impl NamedLogicalType {
+ pub fn new(name: impl Into<String>, data_type: LogicalType) -> Self {
+ Self {
+ name: name.into(),
+ data_type,
+ }
+ }
+
+ pub fn name(&self) -> &str {
+ &self.name
+ }
+
+ pub fn data_type(&self) -> &LogicalType {
+ &self.data_type
+ }
+}
+
+pub type OwnedTypeSignature = TypeSignature<'static>;
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct TypeSignature<'a> {
+ // **func_name**(p1, p2)
+ name: Cow<'a, str>,
+ // func_name(**p1**, **p2**)
+ params: Vec<Cow<'a, str>>,
+}
+
+impl<'a> TypeSignature<'a> {
+ pub fn new(name: impl Into<Cow<'a, str>>) -> Self {
+ Self::new_with_params(name, vec![])
+ }
+
+ pub fn new_with_params(
+ name: impl Into<Cow<'a, str>>,
+ params: Vec<Cow<'a, str>>,
+ ) -> Self {
+ Self {
+ name: name.into(),
+ params,
+ }
+ }
+
+ pub fn to_owned_type_signature(&self) -> OwnedTypeSignature {
+ OwnedTypeSignature {
+ name: self.name.to_string().into(),
+ params: self.params.iter().map(|p| p.to_string().into()).collect(),
+ }
+ }
+}
+
+pub type ExtensionTypeRef = Arc<dyn ExtensionType + Send + Sync>;
+
+pub trait ExtensionType: std::fmt::Debug {
+ fn display_name(&self) -> &str;
+ fn type_signature(&self) -> TypeSignature;
+ fn physical_type(&self) -> DataType;
+
+ fn is_comparable(&self) -> bool;
+ fn is_orderable(&self) -> bool;
+ fn is_numeric(&self) -> bool;
+}
+
+pub trait TypeManager {
+ fn register_data_type(
+ &mut self,
+ signature: impl Into<TypeSignature<'static>>,
+ extension_type: ExtensionTypeRef,
+ ) -> Result<()>;
+
+ fn data_type(&self, signature: &TypeSignature) ->
Result<Option<ExtensionTypeRef>>;
+}
+
+impl ExtensionType for LogicalType {
+ fn display_name(&self) -> &str {
+ match self {
+ Self::Null => "NULL",
+ Self::Boolean => "BOOLEAN",
+ Self::Int8 => "INT8",
+ Self::Int16 => "INT16",
+ Self::Int32 => "INT32",
+ Self::Int64 => "INT64",
+ Self::UInt8 => "UINT8",
+ Self::UInt16 => "UINT16",
+ Self::UInt32 => "UINT32",
+ Self::UInt64 => "UINT64",
+ Self::Float16 => "FLOAT16",
+ Self::Float32 => "Float16",
+ Self::Float64 => "Float64",
+ Self::String => "String",
+ Self::LargeString => "LargeString",
+ Self::Date32 => "Date32",
+ Self::Date64 => "Date64",
+ Self::Time32(_) => "Time32",
+ Self::Time64(_) => "Time64",
+ Self::Timestamp(_, _) => "Timestamp",
+ Self::Duration(_) => "Duration",
+ Self::Interval(_) => "Interval",
+ Self::Binary => "Binary",
+ Self::FixedSizeBinary(_) => "FixedSizeBinary",
+ Self::LargeBinary => "LargeBinary",
+ Self::Utf8 => "Utf8",
+ Self::LargeUtf8 => "LargeUtf8",
+ Self::List(_) => "List",
+ Self::FixedSizeList(_, _) => "FixedSizeList",
+ Self::LargeList(_) => "LargeList",
+ Self::Struct(_) => "Struct",
+ Self::Map(_, _) => "Map",
+ Self::Decimal128(_, _) => "Decimal128",
+ Self::Decimal256(_, _) => "Decimal256",
+ Self::Extension(ext) => ext.display_name(),
+ }
+ }
+
+ fn type_signature(&self) -> TypeSignature {
+ match self {
+ Self::Boolean => TypeSignature::new("boolean"),
+ Self::Int32 => TypeSignature::new("int32"),
+ Self::Int64 => TypeSignature::new("int64"),
+ Self::UInt64 => TypeSignature::new("uint64"),
+ Self::Float32 => TypeSignature::new("float32"),
+ Self::Float64 => TypeSignature::new("float64"),
+ Self::String => TypeSignature::new("string"),
+ Self::Timestamp(tu, zone) => {
+ let tu = match tu {
+ TimeUnit::Second => "second",
+ TimeUnit::Millisecond => "millisecond",
+ TimeUnit::Microsecond => "microsecond",
+ TimeUnit::Nanosecond => "nanosecond",
+ };
+
+ let params = if let Some(zone) = zone {
+ vec![tu.into(), zone.as_ref().into()]
+ } else {
+ vec![tu.into()]
+ };
+
+ TypeSignature::new_with_params("timestamp", params)
+ }
+ Self::Binary => TypeSignature::new("binary"),
+ Self::Utf8 => TypeSignature::new("string"),
+ Self::Extension(ext) => ext.type_signature(),
+ Self::Struct(fields) => {
+ let params = fields.iter().map(|f| f.name().into()).collect();
+ TypeSignature::new_with_params("struct", params)
+ }
+ other => panic!("not implemented: {other:?}"),
+ }
+ }
+
+ fn physical_type(&self) -> DataType {
+ match self {
+ Self::Boolean => DataType::Boolean,
+ Self::Int32 => DataType::Int32,
+ Self::Int64 => DataType::Int64,
+ Self::UInt64 => DataType::UInt64,
+ Self::Float32 => DataType::Float32,
+ Self::Float64 => DataType::Float64,
+ Self::String => DataType::Utf8,
+ Self::Timestamp(tu, zone) => DataType::Timestamp(tu.clone(),
zone.clone()),
+ Self::Binary => DataType::Binary,
+ Self::Utf8 => DataType::Utf8,
+ Self::Extension(ext) => ext.physical_type(),
+ Self::Struct(fields) => {
+ let fields = fields
+ .iter()
+ .map(|f| {
+ let name = f.name();
+ let data_type = f.physical_type();
+ Arc::new(Field::new(name, data_type, true))
+ })
+ .collect::<Vec<_>>();
+ DataType::Struct(fields.into())
+ }
+ other => panic!("not implemented {other:?}"),
Review Comment:
is the idea that
[`DataType::Dictionary`](https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html#variant.Dictionary)
and
[DataType::RunEndEncoded](https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html#variant.RunEndEncoded)
would also be included here? If so it makes sense
##########
datafusion/common/src/dfschema.rs:
##########
@@ -715,33 +715,61 @@ impl ExprSchema for DFSchema {
}
/// DFField wraps an Arrow field and adds an optional qualifier
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DFField {
/// Optional qualifier (usually a table or relation name)
qualifier: Option<OwnedTableReference>,
/// Arrow field definition
- field: FieldRef,
+ // field: FieldRef,
+ name: String,
+ data_type: LogicalType,
+ nullable: bool,
+ /// A map of key-value pairs containing additional custom meta data.
+ metadata: HashMap<String, String>,
Review Comment:
Maybe we could store in a BTreeMap to avoid sorting them each time 🤔
##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -253,6 +253,7 @@ pub struct ListingOptions {
pub format: Arc<dyn FileFormat>,
/// The expected partition column names in the folder structure.
/// See [Self::with_table_partition_cols] for details
+ /// TODO this maybe LogicalType
Review Comment:
I think one usecase is to use dictionary encoding for the partition columns
to minimize the overhead of creating such columns. As long as they can be
represented / specified with `LogicalType` I think it is a good idea to try
changing this too.
##########
datafusion/common/src/dfschema.rs:
##########
@@ -715,33 +715,61 @@ impl ExprSchema for DFSchema {
}
/// DFField wraps an Arrow field and adds an optional qualifier
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DFField {
/// Optional qualifier (usually a table or relation name)
qualifier: Option<OwnedTableReference>,
/// Arrow field definition
- field: FieldRef,
+ // field: FieldRef,
+ name: String,
+ data_type: LogicalType,
+ nullable: bool,
+ /// A map of key-value pairs containing additional custom meta data.
+ metadata: HashMap<String, String>,
Review Comment:
Maybe we could store in a BTreeMap to avoid sorting them each time 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]