This is an automated email from the ASF dual-hosted git repository.

jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3193014da7 Fix sqllogictest runner regeneration logic (#14881)
3193014da7 is described below

commit 3193014da7f85f5847aa1c7a2a16ea395e2e6099
Author: Andrew Lamb <and...@nerdnetworks.org>
AuthorDate: Tue Feb 25 20:35:37 2025 -0500

    Fix sqllogictest runner regeneration logic (#14881)
---
 datafusion/sqllogictest/README.md                  |   8 +
 .../sqllogictest/regenerate/sqllogictests.rs       |  12 +-
 .../src/engines/datafusion_engine/runner.rs        | 131 +++++++
 .../regenerate/src/engines/postgres_engine/mod.rs  | 379 +++++++++++++++++++++
 datafusion/sqllogictest/regenerate_sqlite_files.sh |   9 +-
 5 files changed, 529 insertions(+), 10 deletions(-)

diff --git a/datafusion/sqllogictest/README.md 
b/datafusion/sqllogictest/README.md
index 257937a65e..a18455476d 100644
--- a/datafusion/sqllogictest/README.md
+++ b/datafusion/sqllogictest/README.md
@@ -243,6 +243,14 @@ export RUST_MIN_STACK=30485760;
 PG_COMPAT=true INCLUDE_SQLITE=true cargo test --features=postgres --test 
sqllogictests
 ```
 
+To update the sqllite expected answers use the 
`datafusion/sqllogictest/regenerate_sqlite_files.sh` script.
+
+Note this must be run with an empty postgres instance. For example
+
+```shell
+PG_URI=postgresql://postgres@localhost:5432/postgres bash 
datafusion/sqllogictest/regenerate_sqlite_files.sh
+```
+
 ## Updating tests: Completion Mode
 
 In test script completion mode, `sqllogictests` reads a prototype script and 
runs the statements and queries against the
diff --git a/datafusion/sqllogictest/regenerate/sqllogictests.rs 
b/datafusion/sqllogictest/regenerate/sqllogictests.rs
index a270655881..edad16bc84 100644
--- a/datafusion/sqllogictest/regenerate/sqllogictests.rs
+++ b/datafusion/sqllogictest/regenerate/sqllogictests.rs
@@ -16,10 +16,10 @@
 // under the License.
 
 use clap::Parser;
-use datafusion_common::instant::Instant;
-use datafusion_common::utils::get_available_parallelism;
-use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError, 
Result};
-use datafusion_common_runtime::SpawnedTask;
+use datafusion::common::instant::Instant;
+use datafusion::common::utils::get_available_parallelism;
+use datafusion::common::{exec_datafusion_err, exec_err, DataFusionError, 
Result};
+use datafusion::common::runtime::SpawnedTask;
 use datafusion_sqllogictest::{DataFusion, TestContext};
 use futures::stream::StreamExt;
 use indicatif::{
@@ -378,7 +378,7 @@ async fn run_test_file_with_postgres(
     _mp: MultiProgress,
     _mp_style: ProgressStyle,
 ) -> Result<()> {
-    use datafusion_common::plan_err;
+    use datafusion::common::plan_err;
     plan_err!("Can not run with postgres as postgres feature is not enabled")
 }
 
@@ -512,7 +512,7 @@ async fn run_complete_file_with_postgres(
     _mp: MultiProgress,
     _mp_style: ProgressStyle,
 ) -> Result<()> {
-    use datafusion_common::plan_err;
+    use datafusion::common::plan_err;
     plan_err!("Can not run with postgres as postgres feature is not enabled")
 }
 
diff --git 
a/datafusion/sqllogictest/regenerate/src/engines/datafusion_engine/runner.rs 
b/datafusion/sqllogictest/regenerate/src/engines/datafusion_engine/runner.rs
new file mode 100644
index 0000000000..e696058484
--- /dev/null
+++ b/datafusion/sqllogictest/regenerate/src/engines/datafusion_engine/runner.rs
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+use std::{path::PathBuf, time::Duration};
+
+use super::{error::Result, normalize, DFSqlLogicTestError};
+use arrow::record_batch::RecordBatch;
+use async_trait::async_trait;
+use datafusion::physical_plan::common::collect;
+use datafusion::physical_plan::execute_stream;
+use datafusion::prelude::SessionContext;
+use indicatif::ProgressBar;
+use log::Level::{Debug, Info};
+use log::{debug, log_enabled, warn};
+use sqllogictest::DBOutput;
+use tokio::time::Instant;
+
+use crate::engines::output::{DFColumnType, DFOutput};
+
+pub struct DataFusion {
+    ctx: SessionContext,
+    relative_path: PathBuf,
+    pb: ProgressBar,
+}
+
+impl DataFusion {
+    pub fn new(ctx: SessionContext, relative_path: PathBuf, pb: ProgressBar) 
-> Self {
+        Self {
+            ctx,
+            relative_path,
+            pb,
+        }
+    }
+
+    fn update_slow_count(&self) {
+        let msg = self.pb.message();
+        let split: Vec<&str> = msg.split(" ").collect();
+        let mut current_count = 0;
+
+        if split.len() > 2 {
+            // third match will be current slow count
+            current_count = split[2].parse::<i32>().unwrap();
+        }
+
+        current_count += 1;
+
+        self.pb
+            .set_message(format!("{} - {} took > 500 ms", split[0], 
current_count));
+    }
+}
+
+#[async_trait]
+impl sqllogictest::AsyncDB for DataFusion {
+    type Error = DFSqlLogicTestError;
+    type ColumnType = DFColumnType;
+
+    async fn run(&mut self, sql: &str) -> Result<DFOutput> {
+        if log_enabled!(Debug) {
+            debug!(
+                "[{}] Running query: \"{}\"",
+                self.relative_path.display(),
+                sql
+            );
+        }
+
+        let start = Instant::now();
+        let result = run_query(&self.ctx, sql).await;
+        let duration = start.elapsed();
+
+        if duration.gt(&Duration::from_millis(500)) {
+            self.update_slow_count();
+        }
+
+        self.pb.inc(1);
+
+        if log_enabled!(Info) && duration.gt(&Duration::from_secs(2)) {
+            warn!(
+                "[{}] Running query took more than 2 sec ({duration:?}): 
\"{sql}\"",
+                self.relative_path.display()
+            );
+        }
+
+        result
+    }
+
+    /// Engine name of current database.
+    fn engine_name(&self) -> &str {
+        "DataFusion"
+    }
+
+    /// [`DataFusion`] calls this function to perform sleep.
+    ///
+    /// The default implementation is `std::thread::sleep`, which is universal 
to any async runtime
+    /// but would block the current thread. If you are running in tokio 
runtime, you should override
+    /// this by `tokio::time::sleep`.
+    async fn sleep(dur: Duration) {
+        tokio::time::sleep(dur).await;
+    }
+}
+
+async fn run_query(ctx: &SessionContext, sql: impl Into<String>) -> 
Result<DFOutput> {
+    let df = ctx.sql(sql.into().as_str()).await?;
+    let task_ctx = Arc::new(df.task_ctx());
+    let plan = df.create_physical_plan().await?;
+
+    let stream = execute_stream(plan, task_ctx)?;
+    let types = normalize::convert_schema_to_types(stream.schema().fields());
+    let results: Vec<RecordBatch> = collect(stream).await?;
+    let rows = normalize::convert_batches(results)?;
+
+    if rows.is_empty() && types.is_empty() {
+        Ok(DBOutput::StatementComplete(0))
+    } else {
+        Ok(DBOutput::Rows { types, rows })
+    }
+}
diff --git 
a/datafusion/sqllogictest/regenerate/src/engines/postgres_engine/mod.rs 
b/datafusion/sqllogictest/regenerate/src/engines/postgres_engine/mod.rs
new file mode 100644
index 0000000000..050d19449c
--- /dev/null
+++ b/datafusion/sqllogictest/regenerate/src/engines/postgres_engine/mod.rs
@@ -0,0 +1,379 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use datafusion::common::runtime::SpawnedTask;
+use futures::{SinkExt, StreamExt};
+use log::{debug, info};
+use sqllogictest::DBOutput;
+/// Postgres engine implementation for sqllogictest.
+use std::path::{Path, PathBuf};
+use std::str::FromStr;
+use std::time::Duration;
+
+use super::conversion::*;
+use crate::engines::output::{DFColumnType, DFOutput};
+use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
+use indicatif::ProgressBar;
+use postgres_types::Type;
+use rust_decimal::Decimal;
+use tokio::time::Instant;
+use tokio_postgres::{Column, Row};
+use types::PgRegtype;
+
+mod types;
+
+// default connect string, can be overridden by the `PG_URL` environment 
variable
+const PG_URI: &str = "postgresql://postgres@127.0.0.1/test";
+
+/// DataFusion sql-logicaltest error
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error("Postgres error: {0}")]
+    Postgres(#[from] tokio_postgres::error::Error),
+    #[error("Error handling copy command: {0}")]
+    Copy(String),
+}
+
+pub type Result<T, E = Error> = std::result::Result<T, E>;
+
+pub struct Postgres {
+    // None means the connection has been shutdown
+    client: Option<tokio_postgres::Client>,
+    _spawned_task: Option<SpawnedTask<()>>,
+    /// Relative test file path
+    relative_path: PathBuf,
+    pb: ProgressBar,
+}
+
+impl Postgres {
+    /// Creates a runner for executing queries against an existing postgres 
connection.
+    /// `relative_path` is used for display output and to create a postgres 
schema.
+    ///
+    /// The database connection details can be overridden by the
+    /// `PG_URI` environment variable.
+    ///
+    /// This defaults to
+    ///
+    /// ```text
+    /// PG_URI="postgresql://postgres@127.0.0.1/test"
+    /// ```
+    ///
+    /// See 
https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html#url
 for format
+    pub async fn connect(relative_path: PathBuf, pb: ProgressBar) -> 
Result<Self> {
+        let uri =
+            std::env::var("PG_URI").map_or(PG_URI.to_string(), 
std::convert::identity);
+
+        info!("Using postgres connection string: {uri}");
+
+        let config = tokio_postgres::Config::from_str(&uri)?;
+
+        // hint to user what the connection string was
+        let res = config.connect(tokio_postgres::NoTls).await;
+        if res.is_err() {
+            eprintln!("Error connecting to postgres using PG_URI={uri}");
+        };
+
+        let (client, connection) = res?;
+
+        let spawned_task = SpawnedTask::spawn(async move {
+            if let Err(e) = connection.await {
+                log::error!("Postgres connection error: {:?}", e);
+            }
+        });
+
+        let schema = schema_name(&relative_path);
+
+        // create a new clean schema for running the test
+        debug!("Creating new empty schema '{schema}'");
+        client
+            .execute(&format!("DROP SCHEMA IF EXISTS {schema} CASCADE"), &[])
+            .await?;
+
+        client
+            .execute(&format!("CREATE SCHEMA {schema}"), &[])
+            .await?;
+
+        client
+            .execute(&format!("SET search_path TO {schema}"), &[])
+            .await?;
+
+        Ok(Self {
+            client: Some(client),
+            _spawned_task: Some(spawned_task),
+            relative_path,
+            pb,
+        })
+    }
+
+    fn get_client(&mut self) -> &mut tokio_postgres::Client {
+        self.client.as_mut().expect("client is shutdown")
+    }
+
+    /// Special COPY command support. "COPY 'filename'" requires the
+    /// server to read the file which may not be possible (maybe it is
+    /// remote or running in some other docker container).
+    ///
+    /// Thus, we rewrite  sql statements like
+    ///
+    /// ```sql
+    /// COPY ... FROM 'filename' ...
+    /// ```
+    ///
+    /// Into
+    ///
+    /// ```sql
+    /// COPY ... FROM STDIN ...
+    /// ```
+    ///
+    /// And read the file locally.
+    async fn run_copy_command(&mut self, sql: &str) -> Result<DFOutput> {
+        let canonical_sql = sql.trim_start().to_ascii_lowercase();
+
+        debug!("Handling COPY command: {sql}");
+
+        // Hacky way to  find the 'filename' in the statement
+        let mut tokens = canonical_sql.split_whitespace().peekable();
+        let mut filename = None;
+
+        // COPY FROM '/opt/data/csv/aggregate_test_100.csv' ...
+        //
+        // into
+        //
+        // COPY FROM STDIN ...
+
+        let mut new_sql = vec![];
+        while let Some(tok) = tokens.next() {
+            new_sql.push(tok);
+            // rewrite FROM <file> to FROM STDIN
+            if tok == "from" {
+                filename = tokens.next();
+                new_sql.push("STDIN");
+            }
+        }
+
+        let filename = filename.map(no_quotes).ok_or_else(|| {
+            Error::Copy(format!("Can not find filename in COPY: {sql}"))
+        })?;
+
+        let new_sql = new_sql.join(" ");
+        debug!("Copying data from file {filename} using sql: {new_sql}");
+
+        // start the COPY command and get location to write data to
+        let tx = self.get_client().transaction().await?;
+        let sink = tx.copy_in(&new_sql).await?;
+        let mut sink = Box::pin(sink);
+
+        // read the input file as a string ans feed it to the copy command
+        let data = std::fs::read_to_string(filename)
+            .map_err(|e| Error::Copy(format!("Error reading {filename}: 
{e}")))?;
+
+        let mut data_stream = 
futures::stream::iter(vec![Ok(Bytes::from(data))]).boxed();
+
+        sink.send_all(&mut data_stream).await?;
+        sink.close().await?;
+        tx.commit().await?;
+        Ok(DBOutput::StatementComplete(0))
+    }
+
+    fn update_slow_count(&self) {
+        let msg = self.pb.message();
+        let split: Vec<&str> = msg.split(" ").collect();
+        let mut current_count = 0;
+
+        if split.len() > 2 {
+            // second match will be current slow count
+            current_count += split[2].parse::<i32>().unwrap();
+        }
+
+        current_count += 1;
+
+        self.pb
+            .set_message(format!("{} - {} took > 500 ms", split[0], 
current_count));
+    }
+}
+
+/// remove single quotes from the start and end of the string
+///
+/// 'filename' --> filename
+fn no_quotes(t: &str) -> &str {
+    t.trim_start_matches('\'').trim_end_matches('\'')
+}
+
+/// Given a file name like pg_compat_foo.slt
+/// return a schema name
+fn schema_name(relative_path: &Path) -> String {
+    relative_path
+        .to_string_lossy()
+        .chars()
+        .filter(|ch| ch.is_ascii_alphanumeric())
+        .collect::<String>()
+        .trim_start_matches("pg_")
+        .to_string()
+}
+
+#[async_trait]
+impl sqllogictest::AsyncDB for Postgres {
+    type Error = Error;
+    type ColumnType = DFColumnType;
+
+    async fn run(
+        &mut self,
+        sql: &str,
+    ) -> Result<DBOutput<Self::ColumnType>, Self::Error> {
+        debug!(
+            "[{}] Running query: \"{}\"",
+            self.relative_path.display(),
+            sql
+        );
+
+        let lower_sql = sql.trim_start().to_ascii_lowercase();
+
+        let is_query_sql = {
+            lower_sql.starts_with("select")
+                || lower_sql.starts_with("values")
+                || lower_sql.starts_with("show")
+                || lower_sql.starts_with("with")
+                || lower_sql.starts_with("describe")
+                || ((lower_sql.starts_with("insert")
+                    || lower_sql.starts_with("update")
+                    || lower_sql.starts_with("delete"))
+                    && lower_sql.contains("returning"))
+        };
+
+        if lower_sql.starts_with("copy") {
+            self.pb.inc(1);
+            return self.run_copy_command(sql).await;
+        }
+
+        if !is_query_sql {
+            self.get_client().execute(sql, &[]).await?;
+            self.pb.inc(1);
+            return Ok(DBOutput::StatementComplete(0));
+        }
+        let start = Instant::now();
+        let rows = self.get_client().query(sql, &[]).await?;
+        let duration = start.elapsed();
+
+        if duration.gt(&Duration::from_millis(500)) {
+            self.update_slow_count();
+        }
+
+        self.pb.inc(1);
+
+        let types: Vec<Type> = if rows.is_empty() {
+            self.get_client()
+                .prepare(sql)
+                .await?
+                .columns()
+                .iter()
+                .map(|c| c.type_().clone())
+                .collect()
+        } else {
+            rows[0]
+                .columns()
+                .iter()
+                .map(|c| c.type_().clone())
+                .collect()
+        };
+
+        if rows.is_empty() && types.is_empty() {
+            Ok(DBOutput::StatementComplete(0))
+        } else {
+            Ok(DBOutput::Rows {
+                types: convert_types(types),
+                rows: convert_rows(rows),
+            })
+        }
+    }
+
+    fn engine_name(&self) -> &str {
+        "postgres"
+    }
+
+}
+
+fn convert_rows(rows: Vec<Row>) -> Vec<Vec<String>> {
+    rows.iter()
+        .map(|row| {
+            row.columns()
+                .iter()
+                .enumerate()
+                .map(|(idx, column)| cell_to_string(row, column, idx))
+                .collect::<Vec<String>>()
+        })
+        .collect::<Vec<_>>()
+}
+
+macro_rules! make_string {
+    ($row:ident, $idx:ident, $t:ty) => {{
+        let value: Option<$t> = $row.get($idx);
+        match value {
+            Some(value) => value.to_string(),
+            None => NULL_STR.to_string(),
+        }
+    }};
+    ($row:ident, $idx:ident, $t:ty, $convert:ident) => {{
+        let value: Option<$t> = $row.get($idx);
+        match value {
+            Some(value) => $convert(value).to_string(),
+            None => NULL_STR.to_string(),
+        }
+    }};
+}
+
+fn cell_to_string(row: &Row, column: &Column, idx: usize) -> String {
+    match column.type_().clone() {
+        Type::CHAR => make_string!(row, idx, i8),
+        Type::INT2 => make_string!(row, idx, i16),
+        Type::INT4 => make_string!(row, idx, i32),
+        Type::INT8 => make_string!(row, idx, i64),
+        Type::NUMERIC => make_string!(row, idx, Decimal, decimal_to_str),
+        Type::DATE => make_string!(row, idx, NaiveDate),
+        Type::TIME => make_string!(row, idx, NaiveTime),
+        Type::TIMESTAMP => {
+            let value: Option<NaiveDateTime> = row.get(idx);
+            value
+                .map(|d| format!("{d:?}"))
+                .unwrap_or_else(|| "NULL".to_string())
+        }
+        Type::BOOL => make_string!(row, idx, bool, bool_to_str),
+        Type::BPCHAR | Type::VARCHAR | Type::TEXT => {
+            make_string!(row, idx, &str, varchar_to_str)
+        }
+        Type::FLOAT4 => make_string!(row, idx, f32, f32_to_str),
+        Type::FLOAT8 => make_string!(row, idx, f64, f64_to_str),
+        Type::REGTYPE => make_string!(row, idx, PgRegtype),
+        _ => unimplemented!("Unsupported type: {}", column.type_().name()),
+    }
+}
+
+fn convert_types(types: Vec<Type>) -> Vec<DFColumnType> {
+    types
+        .into_iter()
+        .map(|t| match t {
+            Type::BOOL => DFColumnType::Boolean,
+            Type::INT2 | Type::INT4 | Type::INT8 => DFColumnType::Integer,
+            Type::BPCHAR | Type::VARCHAR | Type::TEXT => DFColumnType::Text,
+            Type::FLOAT4 | Type::FLOAT8 | Type::NUMERIC => DFColumnType::Float,
+            Type::DATE | Type::TIME => DFColumnType::DateTime,
+            Type::TIMESTAMP => DFColumnType::Timestamp,
+            _ => DFColumnType::Another,
+        })
+        .collect()
+}
diff --git a/datafusion/sqllogictest/regenerate_sqlite_files.sh 
b/datafusion/sqllogictest/regenerate_sqlite_files.sh
index 0c1b26b1a9..981891c05b 100755
--- a/datafusion/sqllogictest/regenerate_sqlite_files.sh
+++ b/datafusion/sqllogictest/regenerate_sqlite_files.sh
@@ -168,8 +168,10 @@ sd -f i '^sqllogictest.*' 'sqllogictest = { git = 
"https://github.com/Omega359/s
 
 echo "Replacing the datafusion/sqllogictest/bin/sqllogictests.rs file with a 
custom version required for running completion"
 
-# replace the sqllogictest.rs with a customized version.
-cp datafusion/sqllogictest/regenerate/sqllogictests.rs 
datafusion/sqllogictest/bin/sqllogictests.rs
+# replace the sqllogictest.rs with a customized versions.
+cp datafusion/sqllogictest/regenerate/sqllogictests.rs                        
datafusion/sqllogictest/bin/sqllogictests.rs
+cp datafusion/sqllogictest/regenerate/src/engines/postgres_engine/mod.rs      
datafusion/sqllogictest/src/engines/postgres_engine/mod.rs
+cp datafusion/sqllogictest/regenerate/src/engines/datafusion_engine/runner.rs 
datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs
 
 echo "Running the sqllogictests with sqlite completion. This will take 
approximately an hour to run"
 
@@ -200,6 +202,5 @@ echo "Cleaning up source code changes and temporary files 
and directories"
 cd "$DF_HOME" || exit;
 find ./datafusion-testing/data/sqlite/ -type f -name "*.bak" -exec rm {} \;
 find ./datafusion/sqllogictest/test_files/pg_compat/ -type f -name "*.bak" 
-exec rm {} \;
-git checkout datafusion/sqllogictest/Cargo.toml
-git checkout datafusion/sqllogictest/bin/sqllogictests.rs
+git checkout datafusion/sqllogictest
 rm -rf /tmp/sqlitetesting


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to