This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new d72e54c2a feat: rand expression support (#1199) d72e54c2a is described below commit d72e54c2a4283465c2ea1f6af2417fd25fac896e Author: Artem Kupchinskiy <akupchins...@proton.me> AuthorDate: Thu Jun 26 03:34:28 2025 +0400 feat: rand expression support (#1199) --- native/Cargo.lock | 311 ++++++++++----------- native/core/src/execution/jni_api.rs | 5 +- native/core/src/execution/planner.rs | 17 +- native/core/src/parquet/mod.rs | 2 +- native/proto/src/proto/expr.proto | 1 + native/spark-expr/src/lib.rs | 2 + .../spark-expr/src/nondetermenistic_funcs/mod.rs | 20 ++ .../spark-expr/src/nondetermenistic_funcs/rand.rs | 274 ++++++++++++++++++ .../org/apache/comet/serde/QueryPlanSerde.scala | 10 + .../org/apache/comet/CometExpressionSuite.scala | 20 ++ 10 files changed, 492 insertions(+), 170 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index 87b42e427..801c302f6 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -13,9 +13,9 @@ dependencies = [ [[package]] name = "adler2" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" [[package]] name = "ahash" @@ -169,7 +169,7 @@ dependencies = [ "chrono", "chrono-tz", "half", - "hashbrown 0.15.3", + "hashbrown 0.15.4", "num", ] @@ -339,9 +339,9 @@ dependencies = [ [[package]] name = "assertables" -version = "9.5.5" +version = "9.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46109705783fa5733709a155971ad89cdd188d45b7e20fba7906f0d6b4f864e3" +checksum = "54caed2f33dafa664c6b8b6d4064d5045bdfd43517b0d31758943df205e63226" [[package]] name = "async-trait" @@ -351,7 +351,7 @@ checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.104", ] [[package]] @@ -371,15 +371,15 @@ checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" [[package]] name = "autocfg" -version = "1.4.0" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" +checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "aws-config" -version = "1.6.3" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02a18fd934af6ae7ca52410d4548b98eb895aab0f1ea417d168d85db1434a141" +checksum = "455e9fb7743c6f6267eb2830ccc08686fbb3d13c9a689369562fd4d4ef9ea462" dependencies = [ "aws-credential-types", "aws-runtime", @@ -442,9 +442,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.5.7" +version = "1.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c4063282c69991e57faab9e5cb21ae557e59f5b0fb285c196335243df8dc25c" +checksum = "4f6c68419d8ba16d9a7463671593c54f81ba58cab466e9b759418da606dcc2e2" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -466,9 +466,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.72.0" +version = "1.73.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13118ad30741222f67b1a18e5071385863914da05124652b38e172d6d3d9ce31" +checksum = "b2ac1674cba7872061a29baaf02209fefe499ff034dfd91bd4cc59e4d7741489" dependencies = [ "aws-credential-types", "aws-runtime", @@ -488,9 +488,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.73.0" +version = "1.74.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f879a8572b4683a8f84f781695bebf2f25cf11a81a2693c31fc0e0215c2c1726" +checksum = "3a6a22f077f5fd3e3c0270d4e1a110346cddf6769e9433eb9e6daceb4ca3b149" dependencies = [ "aws-credential-types", "aws-runtime", @@ -510,9 +510,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.73.0" +version = "1.75.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1e9c3c24e36183e2f698235ed38dcfbbdff1d09b9232dc866c4be3011e0b47e" +checksum = "e3258fa707f2f585ee3049d9550954b959002abd59176975150a01d5cf38ae3f" dependencies = [ "aws-credential-types", "aws-runtime", @@ -533,9 +533,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.3.2" +version = "1.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3734aecf9ff79aa401a6ca099d076535ab465ff76b46440cf567c8e70b65dc13" +checksum = "ddfb9021f581b71870a17eac25b52335b82211cdc092e02b6876b2bcefa61666" dependencies = [ "aws-credential-types", "aws-smithy-http", @@ -586,9 +586,9 @@ dependencies = [ [[package]] name = "aws-smithy-http-client" -version = "1.0.3" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "073d330f94bdf1f47bb3e0f5d45dda1e372a54a553c39ab6e9646902c8c81594" +checksum = "7f491388e741b7ca73b24130ff464c1478acc34d5b331b7dd0a2ee4643595a15" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -609,9 +609,9 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.61.3" +version = "0.61.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92144e45819cae7dc62af23eac5a038a58aa544432d2102609654376a900bd07" +checksum = "a16e040799d29c17412943bdbf488fd75db04112d0c0d4b9290bacf5ae0014b9" dependencies = [ "aws-smithy-types", ] @@ -661,9 +661,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1e5d9e3a80a18afa109391fb5ad09c3daf887b516c6fd805a157c6ea7994a57" +checksum = "bd8531b6d8882fd8f48f82a9754e682e29dd44cff27154af51fa3eb730f59efb" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -678,9 +678,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.3.1" +version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40076bd09fadbc12d5e026ae080d0930defa606856186e31d83ccc6a255eeaf3" +checksum = "d498595448e43de7f4296b7b7a18a8a02c61ec9349128c80a368f7c3b4ab11a8" dependencies = [ "base64-simd", "bytes", @@ -701,9 +701,9 @@ dependencies = [ [[package]] name = "aws-smithy-xml" -version = "0.60.9" +version = "0.60.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab0b0166827aa700d3dc519f72f8b3a91c35d0b8d042dc5d643a91e6f80648fc" +checksum = "3db87b96cb1b16c024980f133968d52882ca0daaee3a086c6decc500f6c99728" dependencies = [ "xmlparser", ] @@ -807,7 +807,7 @@ dependencies = [ "regex", "rustc-hash 1.1.0", "shlex", - "syn 2.0.101", + "syn 2.0.104", "which", ] @@ -877,15 +877,15 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.18.1" +version = "3.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "793db76d6187cd04dff33004d8e6c9cc4e05cd330500379d2394209271b4aeee" +checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" [[package]] name = "bytemuck" -version = "1.23.0" +version = "1.23.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9134a6ef01ce4b366b50689c94f82c14bc72bc5d0386829828a2e2752ef7958c" +checksum = "5c76a5792e44e4abe34d3abf15636779261d45a7450612059293d1d2cfc63422" [[package]] name = "byteorder" @@ -917,9 +917,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.2.26" +version = "1.2.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "956a5e21988b87f372569b66183b78babf23ebc2e744b733e4350a752c4dafac" +checksum = "d487aa071b5f64da6f19a3e848e3578944b726ee5a4854b82172f02aa876bfdc" dependencies = [ "jobserver", "libc", @@ -943,9 +943,9 @@ dependencies = [ [[package]] name = "cfg-if" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268" [[package]] name = "cfg_aliases" @@ -1027,18 +1027,18 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.39" +version = "4.5.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd60e63e9be68e5fb56422e397cf9baddded06dae1d2e523401542383bc72a9f" +checksum = "40b6887a1d8685cebccf115538db5c0efe625ccac9696ad45c409d96566e910f" dependencies = [ "clap_builder", ] [[package]] name = "clap_builder" -version = "4.5.39" +version = "4.5.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89cc6392a1f72bbeb820d71f32108f61fdaf18bc526e1d23954168a67759ef51" +checksum = "e0c66c08ce9f0c698cbce5c0279d0bb6ac936d8674174fe48f736533b964f59e" dependencies = [ "anstyle", "clap_lex", @@ -1046,9 +1046,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.4" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" +checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675" [[package]] name = "cmake" @@ -1447,7 +1447,7 @@ dependencies = [ "regex", "thiserror 2.0.12", "tokio", - "twox-hash 2.1.0", + "twox-hash", ] [[package]] @@ -1789,7 +1789,7 @@ checksum = "8dce50e3b637dab0d25d04d2fe79dfdca2b257eabd76790bffd22c7f90d700c8" dependencies = [ "datafusion-expr", "quote", - "syn 2.0.101", + "syn 2.0.104", ] [[package]] @@ -1829,7 +1829,7 @@ dependencies = [ "itertools 0.14.0", "log", "paste", - "petgraph 0.8.1", + "petgraph 0.8.2", ] [[package]] @@ -2004,7 +2004,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.104", ] [[package]] @@ -2036,7 +2036,7 @@ checksum = "44f23cf4b44bfce11a86ace86f8a73ffdec849c9fd00a386a53d278bd9e81fb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.104", ] [[package]] @@ -2047,12 +2047,12 @@ checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" [[package]] name = "errno" -version = "0.3.12" +version = "0.3.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cea14ef9355e3beab063703aa9dab15afd25f0667c341310c1e5274bb1d0da18" +checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -2091,9 +2091,9 @@ dependencies = [ [[package]] name = "flate2" -version = "1.1.1" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ced92e76e966ca2fd84c8f7aa01a4aea65b0eb6648d72f7c8f3e2764a67fece" +checksum = "4a3d7db9596fecd151c5f638c0ee5d5bd487b6e0ea232e5dc96d5250f6f94b1d" dependencies = [ "crc32fast", "libz-rs-sys", @@ -2211,7 +2211,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.104", ] [[package]] @@ -2263,7 +2263,7 @@ dependencies = [ "cfg-if", "js-sys", "libc", - "wasi 0.11.0+wasi-snapshot-preview1", + "wasi 0.11.1+wasi-snapshot-preview1", "wasm-bindgen", ] @@ -2335,9 +2335,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.15.3" +version = "0.15.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84b26c544d002229e640969970a2e74021aadf6e2f96372b9c58eff97de08eb3" +checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5" dependencies = [ "allocator-api2", "equivalent", @@ -2352,9 +2352,9 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hermit-abi" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f154ce46856750ed433c8649605bf7ed2de3bc35fd9d2a9f30cddd873c80cb08" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" [[package]] name = "hex" @@ -2647,7 +2647,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e" dependencies = [ "equivalent", - "hashbrown 0.15.3", + "hashbrown 0.15.4", ] [[package]] @@ -2865,9 +2865,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.172" +version = "0.2.174" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" +checksum = "1171693293099992e19cddea4e8b849964e9846f4acee11b3948bcc337be8776" [[package]] name = "libloading" @@ -2886,7 +2886,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667" dependencies = [ "cfg-if", - "windows-targets 0.53.0", + "windows-targets 0.53.2", ] [[package]] @@ -2897,9 +2897,9 @@ checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" [[package]] name = "libmimalloc-sys" -version = "0.1.42" +version = "0.1.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec9d6fac27761dabcd4ee73571cdb06b7022dc99089acbe5435691edffaac0f4" +checksum = "bf88cd67e9de251c1781dbe2f641a1a3ad66eaae831b8a2c38fbdc5ddae16d4d" dependencies = [ "cc", "libc", @@ -2993,11 +2993,11 @@ checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" [[package]] name = "lz4_flex" -version = "0.11.3" +version = "0.11.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" +checksum = "08ab2867e3eeeca90e844d1940eab391c9dc5228783db2ed999acbc0a9ed375a" dependencies = [ - "twox-hash 1.6.3", + "twox-hash", ] [[package]] @@ -3012,9 +3012,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.7.4" +version = "2.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" [[package]] name = "memmap2" @@ -3027,19 +3027,13 @@ dependencies = [ [[package]] name = "mimalloc" -version = "0.1.46" +version = "0.1.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "995942f432bbb4822a7e9c3faa87a695185b0d09273ba85f097b54f4e458f2af" +checksum = "b1791cbe101e95af5764f06f20f6760521f7158f69dbf9d6baf941ee1bf6bc40" dependencies = [ "libmimalloc-sys", ] -[[package]] -name = "mime" -version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" - [[package]] name = "minimal-lexical" version = "0.2.1" @@ -3048,9 +3042,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" [[package]] name = "miniz_oxide" -version = "0.8.8" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3be647b768db090acb35d5ec5db2b0e1f1de11133ca123b9eacf5137868f892a" +checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" dependencies = [ "adler2", ] @@ -3062,7 +3056,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c" dependencies = [ "libc", - "wasi 0.11.0+wasi-snapshot-preview1", + "wasi 0.11.1+wasi-snapshot-preview1", "windows-sys 0.59.0", ] @@ -3194,9 +3188,9 @@ dependencies = [ [[package]] name = "object_store" -version = "0.12.1" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d94ac16b433c0ccf75326388c893d2835ab7457ea35ab8ba5d745c053ef5fa16" +checksum = "7781f96d79ed0f961a7021424ab01840efbda64ae7a505aaea195efc91eaaec4" dependencies = [ "async-trait", "base64", @@ -3307,7 +3301,7 @@ dependencies = [ "flate2", "futures", "half", - "hashbrown 0.15.3", + "hashbrown 0.15.4", "lz4_flex", "num", "num-bigint", @@ -3318,7 +3312,7 @@ dependencies = [ "snap", "thrift", "tokio", - "twox-hash 2.1.0", + "twox-hash", "zstd", ] @@ -3361,12 +3355,12 @@ dependencies = [ [[package]] name = "petgraph" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a98c6720655620a521dcc722d0ad66cd8afd5d86e34a89ef691c50b7b24de06" +checksum = "54acf3a685220b533e437e264e4d932cfbdc4cc7ec0cd232ed73c08d03b8a7ca" dependencies = [ "fixedbitset", - "hashbrown 0.15.3", + "hashbrown 0.15.4", "indexmap", "serde", ] @@ -3503,12 +3497,12 @@ dependencies = [ [[package]] name = "prettyplease" -version = "0.2.33" +version = "0.2.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9dee91521343f4c5c6a63edd65e54f31f5c92fe8978c40a4282f8372194c6a7d" +checksum = "061c1221631e079b26479d25bbf2275bfe5917ae8419cd7e34f13bfc2aa7539a" dependencies = [ "proc-macro2", - "syn 2.0.101", + "syn 2.0.104", ] [[package]] @@ -3571,7 +3565,7 @@ dependencies = [ "prost", "prost-types", "regex", - "syn 2.0.101", + "syn 2.0.104", "tempfile", ] @@ -3585,7 +3579,7 @@ dependencies = [ "itertools 0.14.0", "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.104", ] [[package]] @@ -3668,9 +3662,9 @@ dependencies = [ [[package]] name = "quinn-udp" -version = "0.5.12" +version = "0.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee4e529991f949c5e25755532370b8af5d114acae52326361d68d47af64aa842" +checksum = "fcebb1209ee276352ef14ff8732e24cc2b02bbac986cd74a4c81bcb2f9881970" dependencies = [ "cfg_aliases", "libc", @@ -3691,9 +3685,9 @@ dependencies = [ [[package]] name = "r-efi" -version = "5.2.0" +version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" +checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" [[package]] name = "rand" @@ -3791,14 +3785,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76009fbe0614077fc1a2ce255e3a1881a2e3a3527097d5dc6d8212c585e7e38b" dependencies = [ "quote", - "syn 2.0.101", + "syn 2.0.104", ] [[package]] name = "redox_syscall" -version = "0.5.12" +version = "0.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "928fca9cf2aa042393a8325b9ead81d2f0df4cb12e1e24cef072922ccd99c5af" +checksum = "0d04b7d0ee6b4a0207a0a7adb104d23ecb0b47d6beae7152d0fa34b692b29fd6" dependencies = [ "bitflags 2.9.1", ] @@ -3840,9 +3834,9 @@ checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" [[package]] name = "reqwest" -version = "0.12.19" +version = "0.12.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2f8e5513d63f2e5b386eb5106dc67eaf3f84e95258e210489136b8b92ad6119" +checksum = "eabf4c97d9130e2bf606614eb937e86edac8292eaa6f422f995d7e8de1eb1813" dependencies = [ "base64", "bytes", @@ -3855,11 +3849,8 @@ dependencies = [ "hyper", "hyper-rustls", "hyper-util", - "ipnet", "js-sys", "log", - "mime", - "once_cell", "percent-encoding", "pin-project-lite", "quinn", @@ -3908,9 +3899,9 @@ dependencies = [ [[package]] name = "rustc-demangle" -version = "0.1.24" +version = "0.1.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +checksum = "989e6739f80c4ad5b13e0fd7fe89531180375b18520cc8c82080e4dc4035b84f" [[package]] name = "rustc-hash" @@ -3961,9 +3952,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.27" +version = "0.23.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "730944ca083c1c233a75c09f199e973ca499344a2b7ba9e755c457e86fb4a321" +checksum = "7160e3e10bf4535308537f3c4e1641468cd0e485175d6163087c0393c7d46643" dependencies = [ "aws-lc-rs", "once_cell", @@ -4115,7 +4106,7 @@ checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.104", ] [[package]] @@ -4201,18 +4192,15 @@ checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" [[package]] name = "slab" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" -dependencies = [ - "autocfg", -] +checksum = "04dc19736151f35336d325007ac991178d504a119863a2fcb3758cdb5e52c50d" [[package]] name = "smallvec" -version = "1.15.0" +version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8917285742e9f3e1683f0a9c4e6b57960b7314d0b08d30d1ecd426713ee2eee9" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" [[package]] name = "snap" @@ -4249,7 +4237,7 @@ checksum = "da5fc6819faabb412da764b99d3b713bb55083c11e7e0c00144d386cd6a1939c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.104", ] [[package]] @@ -4325,9 +4313,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.101" +version = "2.0.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce2b7fc941b3a24138a0a7cf8e858bfc6a992e7978a068a5c760deb0ed43caf" +checksum = "17b6f705963418cdb9927482fa304bc562ece2fdd4f616084c50b7023b435a40" dependencies = [ "proc-macro2", "quote", @@ -4351,7 +4339,7 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.104", ] [[package]] @@ -4393,7 +4381,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.104", ] [[package]] @@ -4404,7 +4392,7 @@ checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.104", ] [[package]] @@ -4559,7 +4547,7 @@ checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.104", ] [[package]] @@ -4643,13 +4631,13 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.29" +version = "0.1.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b1ffbcf9c6f6b99d386e7444eb608ba646ae452a36b39737deb9663b610f662" +checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.104", ] [[package]] @@ -4669,21 +4657,11 @@ checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "twox-hash" -version = "1.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" -dependencies = [ - "cfg-if", - "static_assertions", -] - -[[package]] -name = "twox-hash" -version = "2.1.0" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7b17f197b3050ba473acf9181f7b1d3b66d1cf7356c6cc57886662276e65908" +checksum = "8b907da542cbced5261bd3256de1b3a1bf340a3d37f93425a07362a1d687de56" dependencies = [ - "rand 0.8.5", + "rand 0.9.1", ] [[package]] @@ -4715,9 +4693,9 @@ checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" [[package]] name = "unicode-width" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd" +checksum = "4a1a07cc7db3810833284e8d372ccdc6da29741639ecc70c9ec107df0fa6154c" [[package]] name = "unsafe-any-ors" @@ -4807,9 +4785,9 @@ dependencies = [ [[package]] name = "wasi" -version = "0.11.0+wasi-snapshot-preview1" +version = "0.11.1+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" [[package]] name = "wasi" @@ -4842,7 +4820,7 @@ dependencies = [ "log", "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.104", "wasm-bindgen-shared", ] @@ -4877,7 +4855,7 @@ checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.104", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -4988,7 +4966,7 @@ checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.104", ] [[package]] @@ -4999,14 +4977,14 @@ checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.104", ] [[package]] name = "windows-link" -version = "0.1.1" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38" +checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" [[package]] name = "windows-result" @@ -5053,6 +5031,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" +dependencies = [ + "windows-targets 0.53.2", +] + [[package]] name = "windows-targets" version = "0.42.2" @@ -5086,9 +5073,9 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.53.0" +version = "0.53.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1e4c7e8ceaaf9cb7d7507c974735728ab453b67ef8f18febdd7c11fe59dca8b" +checksum = "c66f69fcc9ce11da9966ddb31a40968cad001c5bedeb5c2b82ede4253ab48aef" dependencies = [ "windows_aarch64_gnullvm 0.53.0", "windows_aarch64_msvc 0.53.0", @@ -5279,28 +5266,28 @@ checksum = "38da3c9736e16c5d3c8c597a9aaa5d1fa565d0532ae05e27c24aa62fb32c0ab6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.104", "synstructure", ] [[package]] name = "zerocopy" -version = "0.8.25" +version = "0.8.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1702d9583232ddb9174e01bb7c15a2ab8fb1bc6f227aa1233858c351a3ba0cb" +checksum = "1039dd0d3c310cf05de012d8a39ff557cb0d23087fd44cad61df08fc31907a2f" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.25" +version = "0.8.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28a6e20d751156648aa063f3800b706ee209a32c0b4d9f24be3d980b01be55ef" +checksum = "9ecf5b4cc5364572d7f4c329661bcc82724222973f2cab6f050a4e5c22f75181" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.104", ] [[package]] @@ -5320,7 +5307,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.104", "synstructure", ] @@ -5360,7 +5347,7 @@ checksum = "5b96237efa0c878c64bd89c436f661be4e46b2f3eff1ebb976f7ef2321d2f58f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.101", + "syn 2.0.104", ] [[package]] diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index ffee9c599..a7ddce34f 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -408,8 +408,9 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( // query plan, we need to defer stream initialization to first time execution. if exec_context.root_op.is_none() { let start = Instant::now(); - let planner = PhysicalPlanner::new(Arc::clone(&exec_context.session_ctx)) - .with_exec_id(exec_context_id); + let planner = + PhysicalPlanner::new(Arc::clone(&exec_context.session_ctx), partition) + .with_exec_id(exec_context_id); let (scans, root_op) = planner.create_plan( &exec_context.spark_plan, &mut exec_context.input_sources.clone(), diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 906c58e02..0ce1ea7a9 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -106,7 +106,7 @@ use datafusion_comet_proto::{ use datafusion_comet_spark_expr::{ ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Contains, Correlation, Covariance, CreateNamedStruct, EndsWith, GetArrayStructFields, GetStructField, IfExpr, Like, ListExtract, - NormalizeNaNAndZero, RLike, SparkCastOptions, StartsWith, Stddev, StringSpaceExpr, + NormalizeNaNAndZero, RLike, RandExpr, SparkCastOptions, StartsWith, Stddev, StringSpaceExpr, SubstringExpr, SumDecimal, TimestampTruncExpr, ToJson, UnboundColumn, Variance, }; use itertools::Itertools; @@ -141,26 +141,29 @@ pub const TEST_EXEC_CONTEXT_ID: i64 = -1; pub struct PhysicalPlanner { // The execution context id of this planner. exec_context_id: i64, + partition: i32, session_ctx: Arc<SessionContext>, } impl Default for PhysicalPlanner { fn default() -> Self { - Self::new(Arc::new(SessionContext::new())) + Self::new(Arc::new(SessionContext::new()), 0) } } impl PhysicalPlanner { - pub fn new(session_ctx: Arc<SessionContext>) -> Self { + pub fn new(session_ctx: Arc<SessionContext>, partition: i32) -> Self { Self { exec_context_id: TEST_EXEC_CONTEXT_ID, session_ctx, + partition, } } pub fn with_exec_id(self, exec_context_id: i64) -> Self { Self { exec_context_id, + partition: self.partition, session_ctx: Arc::clone(&self.session_ctx), } } @@ -801,6 +804,10 @@ impl PhysicalPlanner { expr.legacy_negative_index, ))) } + ExprStruct::Rand(expr) => { + let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?; + Ok(Arc::new(RandExpr::new(child, self.partition))) + } expr => Err(GeneralError(format!("Not implemented: {:?}", expr))), } } @@ -2946,7 +2953,7 @@ mod tests { datafusion_functions_nested::make_array::MakeArray::new(), )); let task_ctx = session_ctx.task_ctx(); - let planner = PhysicalPlanner::new(Arc::from(session_ctx)); + let planner = PhysicalPlanner::new(Arc::from(session_ctx), 0); // Create a plan for // ProjectionExec: expr=[make_array(col_0@0) as col_0] @@ -3062,7 +3069,7 @@ mod tests { fn test_array_repeat() { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); - let planner = PhysicalPlanner::new(Arc::from(session_ctx)); + let planner = PhysicalPlanner::new(Arc::from(session_ctx), 0); // Mock scan operator with 3 INT32 columns let op_scan = Operator { diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index 840e43d57..2f23a8cfd 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -687,7 +687,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat try_unwrap_or_throw(&e, |mut env| unsafe { let session_config = SessionConfig::new().with_batch_size(batch_size as usize); let planer = - PhysicalPlanner::new(Arc::new(SessionContext::new_with_config(session_config))); + PhysicalPlanner::new(Arc::new(SessionContext::new_with_config(session_config)), 0); let session_ctx = planer.session_ctx(); let path: String = env diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 0886eb561..5c2eeac93 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -83,6 +83,7 @@ message Expr { ArrayInsert array_insert = 58; MathExpr integral_divide = 59; ToPrettyString to_pretty_string = 60; + UnaryExpr rand = 61; } } diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index d3d4d45d5..8cfedf576 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -52,11 +52,13 @@ pub use cast::{spark_cast, Cast, SparkCastOptions}; mod conditional_funcs; mod conversion_funcs; mod math_funcs; +mod nondetermenistic_funcs; pub use array_funcs::*; pub use bitwise_funcs::*; pub use conditional_funcs::*; pub use conversion_funcs::*; +pub use nondetermenistic_funcs::*; pub use comet_scalar_funcs::{create_comet_physical_fun, register_all_comet_functions}; pub use datetime_funcs::{ diff --git a/native/spark-expr/src/nondetermenistic_funcs/mod.rs b/native/spark-expr/src/nondetermenistic_funcs/mod.rs new file mode 100644 index 000000000..c5ff894e8 --- /dev/null +++ b/native/spark-expr/src/nondetermenistic_funcs/mod.rs @@ -0,0 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod rand; + +pub use rand::RandExpr; diff --git a/native/spark-expr/src/nondetermenistic_funcs/rand.rs b/native/spark-expr/src/nondetermenistic_funcs/rand.rs new file mode 100644 index 000000000..903661f4f --- /dev/null +++ b/native/spark-expr/src/nondetermenistic_funcs/rand.rs @@ -0,0 +1,274 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::hash_funcs::murmur3::spark_compatible_murmur3_hash; +use arrow::array::{Float64Array, Float64Builder, RecordBatch}; +use arrow::datatypes::{DataType, Schema}; +use datafusion::common::Result; +use datafusion::common::ScalarValue; +use datafusion::error::DataFusionError; +use datafusion::logical_expr::ColumnarValue; +use datafusion::physical_expr::PhysicalExpr; +use std::any::Any; +use std::fmt::{Display, Formatter}; +use std::hash::{Hash, Hasher}; +use std::sync::{Arc, Mutex}; + +/// Adoption of the XOR-shift algorithm used in Apache Spark. +/// See: https://github.com/apache/spark/blob/91f3fdd25852b43095dd5273358fc394ffd11b66/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala +/// Normalization multiplier used in mapping from a random i64 value to the f64 interval [0.0, 1.0). +/// Corresponds to the java implementation: https://github.com/openjdk/jdk/blob/07c9f7138affdf0d42ecdc30adcb854515569985/src/java.base/share/classes/java/util/Random.java#L302 +/// Due to the lack of hexadecimal float literals support in rust, the scientific notation is used instead. +const DOUBLE_UNIT: f64 = 1.1102230246251565e-16; + +/// Spark-compatible initial seed which is actually a part of the scala standard library murmurhash3 implementation. +/// The references: +/// https://github.com/apache/spark/blob/91f3fdd25852b43095dd5273358fc394ffd11b66/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala#L63 +/// https://github.com/scala/scala/blob/360d5da544d84b821c40e4662ad08703b51a44e1/src/library/scala/util/hashing/MurmurHash3.scala#L331 +const SPARK_MURMUR_ARRAY_SEED: u32 = 0x3c074a61; + +#[derive(Debug, Clone)] +struct XorShiftRandom { + seed: i64, +} + +impl XorShiftRandom { + fn from_init_seed(init_seed: i64) -> Self { + XorShiftRandom { + seed: Self::init_seed(init_seed), + } + } + + fn from_stored_seed(stored_seed: i64) -> Self { + XorShiftRandom { seed: stored_seed } + } + + fn next(&mut self, bits: u8) -> i32 { + let mut next_seed = self.seed ^ (self.seed << 21); + next_seed ^= ((next_seed as u64) >> 35) as i64; + next_seed ^= next_seed << 4; + self.seed = next_seed; + (next_seed & ((1i64 << bits) - 1)) as i32 + } + + pub fn next_f64(&mut self) -> f64 { + let a = self.next(26) as i64; + let b = self.next(27) as i64; + ((a << 27) + b) as f64 * DOUBLE_UNIT + } + + fn init_seed(init: i64) -> i64 { + let bytes_repr = init.to_be_bytes(); + let low_bits = spark_compatible_murmur3_hash(bytes_repr, SPARK_MURMUR_ARRAY_SEED); + let high_bits = spark_compatible_murmur3_hash(bytes_repr, low_bits); + ((high_bits as i64) << 32) | (low_bits as i64 & 0xFFFFFFFFi64) + } +} + +#[derive(Debug)] +pub struct RandExpr { + seed: Arc<dyn PhysicalExpr>, + init_seed_shift: i32, + state_holder: Arc<Mutex<Option<i64>>>, +} + +impl RandExpr { + pub fn new(seed: Arc<dyn PhysicalExpr>, init_seed_shift: i32) -> Self { + Self { + seed, + init_seed_shift, + state_holder: Arc::new(Mutex::new(None::<i64>)), + } + } + + fn extract_init_state(seed: ScalarValue) -> Result<i64> { + if let ScalarValue::Int64(seed_opt) = seed.cast_to(&DataType::Int64)? { + Ok(seed_opt.unwrap_or(0)) + } else { + Err(DataFusionError::Internal( + "unexpected execution branch".to_string(), + )) + } + } + fn evaluate_batch(&self, seed: ScalarValue, num_rows: usize) -> Result<ColumnarValue> { + let mut seed_state = self.state_holder.lock().unwrap(); + let mut rnd = if seed_state.is_none() { + let init_seed = RandExpr::extract_init_state(seed)?; + let init_seed = init_seed.wrapping_add(self.init_seed_shift as i64); + *seed_state = Some(init_seed); + XorShiftRandom::from_init_seed(init_seed) + } else { + let stored_seed = seed_state.unwrap(); + XorShiftRandom::from_stored_seed(stored_seed) + }; + + let mut arr_builder = Float64Builder::with_capacity(num_rows); + std::iter::repeat_with(|| rnd.next_f64()) + .take(num_rows) + .for_each(|v| arr_builder.append_value(v)); + let array_ref = Arc::new(Float64Array::from(arr_builder.finish())); + *seed_state = Some(rnd.seed); + Ok(ColumnarValue::Array(array_ref)) + } +} + +impl Display for RandExpr { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "RAND({})", self.seed) + } +} + +impl PartialEq for RandExpr { + fn eq(&self, other: &Self) -> bool { + self.seed.eq(&other.seed) && self.init_seed_shift == other.init_seed_shift + } +} + +impl Eq for RandExpr {} + +impl Hash for RandExpr { + fn hash<H: Hasher>(&self, state: &mut H) { + self.children().hash(state); + } +} + +impl PhysicalExpr for RandExpr { + fn as_any(&self) -> &dyn Any { + self + } + + fn data_type(&self, _input_schema: &Schema) -> Result<DataType> { + Ok(DataType::Float64) + } + + fn nullable(&self, _input_schema: &Schema) -> Result<bool> { + Ok(false) + } + + fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> { + match self.seed.evaluate(batch)? { + ColumnarValue::Scalar(seed) => self.evaluate_batch(seed, batch.num_rows()), + ColumnarValue::Array(_arr) => Err(DataFusionError::NotImplemented(format!( + "Only literal seeds are supported for {}", + self + ))), + } + } + + fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> { + vec![&self.seed] + } + + fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result { + unimplemented!() + } + + fn with_new_children( + self: Arc<Self>, + children: Vec<Arc<dyn PhysicalExpr>>, + ) -> Result<Arc<dyn PhysicalExpr>> { + Ok(Arc::new(RandExpr::new( + Arc::clone(&children[0]), + self.init_seed_shift, + ))) + } +} + +pub fn rand(seed: Arc<dyn PhysicalExpr>, init_seed_shift: i32) -> Result<Arc<dyn PhysicalExpr>> { + Ok(Arc::new(RandExpr::new(seed, init_seed_shift))) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Array, BooleanArray, Int64Array}; + use arrow::{array::StringArray, compute::concat, datatypes::*}; + use datafusion::common::cast::as_float64_array; + use datafusion::physical_expr::expressions::lit; + + const SPARK_SEED_42_FIRST_5: [f64; 5] = [ + 0.619189370225301, + 0.5096018842446481, + 0.8325259388871524, + 0.26322809041172357, + 0.6702867696264135, + ]; + + #[test] + fn test_rand_single_batch() -> Result<()> { + let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]); + let data = StringArray::from(vec![Some("foo"), None, None, Some("bar"), None]); + let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)])?; + let rand_expr = rand(lit(42), 0)?; + let result = rand_expr.evaluate(&batch)?.into_array(batch.num_rows())?; + let result = as_float64_array(&result)?; + let expected = &Float64Array::from(Vec::from(SPARK_SEED_42_FIRST_5)); + assert_eq!(result, expected); + Ok(()) + } + + #[test] + fn test_rand_multi_batch() -> Result<()> { + let first_batch_schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]); + let first_batch_data = Int64Array::from(vec![Some(42), None]); + let second_batch_schema = first_batch_schema.clone(); + let second_batch_data = Int64Array::from(vec![None, Some(-42), None]); + let rand_expr = rand(lit(42), 0)?; + let first_batch = RecordBatch::try_new( + Arc::new(first_batch_schema), + vec![Arc::new(first_batch_data)], + )?; + let first_batch_result = rand_expr + .evaluate(&first_batch)? + .into_array(first_batch.num_rows())?; + let second_batch = RecordBatch::try_new( + Arc::new(second_batch_schema), + vec![Arc::new(second_batch_data)], + )?; + let second_batch_result = rand_expr + .evaluate(&second_batch)? + .into_array(second_batch.num_rows())?; + let result_arrays: Vec<&dyn Array> = vec![ + as_float64_array(&first_batch_result)?, + as_float64_array(&second_batch_result)?, + ]; + let result_arrays = &concat(&result_arrays)?; + let final_result = as_float64_array(result_arrays)?; + let expected = &Float64Array::from(Vec::from(SPARK_SEED_42_FIRST_5)); + assert_eq!(final_result, expected); + Ok(()) + } + + #[test] + fn test_overflow_shift_seed() -> Result<()> { + let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]); + let data = BooleanArray::from(vec![Some(true), Some(false)]); + let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)])?; + let max_seed_and_shift_expr = rand(lit(i64::MAX), 1)?; + let min_seed_no_shift_expr = rand(lit(i64::MIN), 0)?; + let first_expr_result = max_seed_and_shift_expr + .evaluate(&batch)? + .into_array(batch.num_rows())?; + let first_expr_result = as_float64_array(&first_expr_result)?; + let second_expr_result = min_seed_no_shift_expr + .evaluate(&batch)? + .into_array(batch.num_rows())?; + let second_expr_result = as_float64_array(&second_expr_result)?; + assert_eq!(first_expr_result, second_expr_result); + Ok(()) + } +} diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 9671ef9d7..4e45311d0 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1972,6 +1972,16 @@ object QueryPlanSerde extends Logging with CometExprShim { } case _ @ArrayFilter(_, func) if func.children.head.isInstanceOf[IsNotNull] => convert(CometArrayCompact) + case _: ArrayExcept => + convert(CometArrayExcept) + case Rand(child, _) => + createUnaryExpr( + expr, + child, + inputs, + binding, + (builder, unaryExpr) => builder.setRand(unaryExpr)) + case mk: MapKeys => val childExpr = exprToProtoInternal(mk.child, inputs, binding) scalarFunctionExprToProto("map_keys", childExpr) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 8fe04009c..ce9ac120c 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2806,6 +2806,26 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("rand expression with random parameters") { + val partitionsNumber = Random.nextInt(10) + 1 + val rowsNumber = Random.nextInt(500) + val seed = Random.nextLong() + // use this value to have both single-batch and multi-batch partitions + val cometBatchSize = math.max(1, math.ceil(rowsNumber.toDouble / partitionsNumber).toInt) + withSQLConf("spark.comet.batchSize" -> cometBatchSize.toString) { + withParquetDataFrame((0 until rowsNumber).map(Tuple1.apply)) { df => + val dfWithRandParameters = df.repartition(partitionsNumber).withColumn("rnd", rand(seed)) + checkSparkAnswerAndOperator(dfWithRandParameters) + val dfWithOverflowSeed = + df.repartition(partitionsNumber).withColumn("rnd", rand(Long.MaxValue)) + checkSparkAnswerAndOperator(dfWithOverflowSeed) + val dfWithNullSeed = + df.repartition(partitionsNumber).selectExpr("_1", "rand(null) as rnd") + checkSparkAnswerAndOperator(dfWithNullSeed) + } + } + } + test("window query with rangeBetween") { // values are int --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org