eejbyfeldt commented on code in PR #2037:
URL: https://github.com/apache/datafusion-comet/pull/2037#discussion_r2239253795


##########
native/spark-expr/src/nondetermenistic_funcs/monotonically_increasing_id.rs:
##########
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{Int64Array, RecordBatch};
+use arrow::datatypes::{DataType, Schema};
+use datafusion::common::Result;
+use datafusion::logical_expr::ColumnarValue;
+use datafusion::physical_expr::PhysicalExpr;
+use std::any::Any;
+use std::fmt::{Debug, Display, Formatter};
+use std::hash::{Hash, Hasher};
+use std::sync::atomic::{AtomicI64, Ordering};
+use std::sync::Arc;
+
+#[derive(Debug)]
+pub struct MonotonicallyIncreasingId {
+    initial_offset: i64,
+    current_offset: AtomicI64,
+}
+
+impl MonotonicallyIncreasingId {
+    pub fn new(offset: i64) -> Self {
+        Self {
+            initial_offset: offset,
+            current_offset: AtomicI64::new(offset),
+        }
+    }
+}
+
+pub fn monotonically_increasing_id(offset: i64) -> Arc<dyn PhysicalExpr> {
+    Arc::new(MonotonicallyIncreasingId::new(offset))
+}
+
+impl Display for MonotonicallyIncreasingId {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "monotonically_increasing_id()")
+    }
+}
+
+impl PartialEq for MonotonicallyIncreasingId {
+    fn eq(&self, other: &Self) -> bool {
+        self.initial_offset == other.initial_offset
+    }
+}
+
+impl Eq for MonotonicallyIncreasingId {}
+
+impl Hash for MonotonicallyIncreasingId {
+    fn hash<H: Hasher>(&self, state: &mut H) {
+        self.children().hash(state);
+    }
+}
+
+impl PhysicalExpr for MonotonicallyIncreasingId {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
+        let start = self.current_offset.load(Ordering::Relaxed);
+        let end = start + batch.num_rows() as i64;
+        let array_ref = Arc::new(Int64Array::from_iter_values(start..end));
+        self.current_offset
+            .fetch_add(batch.num_rows() as i64, Ordering::Relaxed);
+        Ok(ColumnarValue::Array(array_ref))
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _: Vec<Arc<dyn PhysicalExpr>>,
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        Ok(Arc::new(MonotonicallyIncreasingId::new(
+            self.initial_offset,
+        )))

Review Comment:
   Why is this not an acceptable implementation? 
   ```suggestion
           Ok(self)
   ```



##########
native/spark-expr/src/nondetermenistic_funcs/monotonically_increasing_id.rs:
##########
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{Int64Array, RecordBatch};
+use arrow::datatypes::{DataType, Schema};
+use datafusion::common::Result;
+use datafusion::logical_expr::ColumnarValue;
+use datafusion::physical_expr::PhysicalExpr;
+use std::any::Any;
+use std::fmt::{Debug, Display, Formatter};
+use std::hash::{Hash, Hasher};
+use std::sync::atomic::{AtomicI64, Ordering};
+use std::sync::Arc;
+
+#[derive(Debug)]
+pub struct MonotonicallyIncreasingId {
+    initial_offset: i64,
+    current_offset: AtomicI64,
+}
+
+impl MonotonicallyIncreasingId {
+    pub fn new(offset: i64) -> Self {
+        Self {
+            initial_offset: offset,
+            current_offset: AtomicI64::new(offset),
+        }
+    }
+}
+
+pub fn monotonically_increasing_id(offset: i64) -> Arc<dyn PhysicalExpr> {
+    Arc::new(MonotonicallyIncreasingId::new(offset))
+}
+
+impl Display for MonotonicallyIncreasingId {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "monotonically_increasing_id()")
+    }
+}
+
+impl PartialEq for MonotonicallyIncreasingId {
+    fn eq(&self, other: &Self) -> bool {
+        self.initial_offset == other.initial_offset
+    }
+}
+
+impl Eq for MonotonicallyIncreasingId {}
+
+impl Hash for MonotonicallyIncreasingId {
+    fn hash<H: Hasher>(&self, state: &mut H) {
+        self.children().hash(state);
+    }

Review Comment:
   Since this expr has no children I belive this is the same as doing nothing. 
Should this instead be?
   ```suggestion
       fn hash<H: Hasher>(&self, state: &mut H) {
           self.initial_offset.hash(state);
       }
   ```



##########
native/core/src/execution/planner.rs:
##########
@@ -798,6 +799,13 @@ impl PhysicalPlanner {
                 let seed = expr.seed.wrapping_add(self.partition.into());
                 Ok(Arc::new(RandnExpr::new(seed)))
             }
+            ExprStruct::SparkPartitionId(_) => 
Ok(Arc::new(DataFusionLiteral::new(
+                ScalarValue::Int32(Some(self.partition)),
+            ))),
+            ExprStruct::MonotonicallyIncreasingId(_) => {
+                let offset = (self.partition as i64) << 33;
+                Ok(Arc::new(MonotonicallyIncreasingId::new(offset)))

Review Comment:
   Feels a bit like the `(self.partition as i64) << 33` should be moved into 
`MonotonicallyIncreasingId::new` as it feels like that logic is more something 
that belongs in the trait `MonotonicallyIncreasingId` rather than being here in 
planner.



##########
native/spark-expr/src/nondetermenistic_funcs/monotonically_increasing_id.rs:
##########
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{Int64Array, RecordBatch};
+use arrow::datatypes::{DataType, Schema};
+use datafusion::common::Result;
+use datafusion::logical_expr::ColumnarValue;
+use datafusion::physical_expr::PhysicalExpr;
+use std::any::Any;
+use std::fmt::{Debug, Display, Formatter};
+use std::hash::{Hash, Hasher};
+use std::sync::atomic::{AtomicI64, Ordering};
+use std::sync::Arc;
+
+#[derive(Debug)]
+pub struct MonotonicallyIncreasingId {
+    initial_offset: i64,
+    current_offset: AtomicI64,
+}
+
+impl MonotonicallyIncreasingId {
+    pub fn new(offset: i64) -> Self {
+        Self {
+            initial_offset: offset,
+            current_offset: AtomicI64::new(offset),
+        }
+    }
+}
+
+pub fn monotonically_increasing_id(offset: i64) -> Arc<dyn PhysicalExpr> {
+    Arc::new(MonotonicallyIncreasingId::new(offset))
+}
+
+impl Display for MonotonicallyIncreasingId {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "monotonically_increasing_id()")
+    }
+}
+
+impl PartialEq for MonotonicallyIncreasingId {
+    fn eq(&self, other: &Self) -> bool {
+        self.initial_offset == other.initial_offset
+    }
+}
+
+impl Eq for MonotonicallyIncreasingId {}
+
+impl Hash for MonotonicallyIncreasingId {
+    fn hash<H: Hasher>(&self, state: &mut H) {
+        self.children().hash(state);
+    }
+}
+
+impl PhysicalExpr for MonotonicallyIncreasingId {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
+        let start = self.current_offset.load(Ordering::Relaxed);
+        let end = start + batch.num_rows() as i64;
+        let array_ref = Arc::new(Int64Array::from_iter_values(start..end));
+        self.current_offset
+            .fetch_add(batch.num_rows() as i64, Ordering::Relaxed);

Review Comment:
   Feels like this should be 
   ```suggestion
           let start = self
               .current_offset
               .fetch_add(batch.num_rows() as i64, Ordering::Relaxed);
           let end = start + batch.num_rows() as i64;
           let array_ref = Arc::new(Int64Array::from_iter_values(start..end));
   ```
   so that multiple threads can not end up with the same start value. Maybe 
that does not happen in the current usage, but still seems simpler like that as 
well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to