This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new fb40506b3f Update regenerate sql dep, revert runner changes. (#14901)
fb40506b3f is described below

commit fb40506b3f8d90930d4bf2b8b00fca5a5eac11fe
Author: Bruce Ritchie <[email protected]>
AuthorDate: Wed Feb 26 18:30:36 2025 -0500

    Update regenerate sql dep, revert runner changes. (#14901)
---
 datafusion/sqllogictest/Cargo.toml                 |   2 +
 .../src/engines/datafusion_engine/runner.rs        | 131 -------
 .../regenerate/src/engines/postgres_engine/mod.rs  | 379 ---------------------
 datafusion/sqllogictest/regenerate_sqlite_files.sh |  11 +-
 4 files changed, 7 insertions(+), 516 deletions(-)

diff --git a/datafusion/sqllogictest/Cargo.toml 
b/datafusion/sqllogictest/Cargo.toml
index bc1b283eda..504e516480 100644
--- a/datafusion/sqllogictest/Cargo.toml
+++ b/datafusion/sqllogictest/Cargo.toml
@@ -53,6 +53,8 @@ object_store = { workspace = true }
 postgres-protocol = { version = "0.6.7", optional = true }
 postgres-types = { version = "0.2.8", features = ["derive", 
"with-chrono-0_4"], optional = true }
 rust_decimal = { version = "1.36.0", features = ["tokio-pg"] }
+# When updating the following dependency verify that sqlite test file 
regeneration works correctly
+# by running the regenerate_sqlite_files.sh script.
 sqllogictest = "0.27.2"
 sqlparser = { workspace = true }
 tempfile = { workspace = true }
diff --git 
a/datafusion/sqllogictest/regenerate/src/engines/datafusion_engine/runner.rs 
b/datafusion/sqllogictest/regenerate/src/engines/datafusion_engine/runner.rs
deleted file mode 100644
index e696058484..0000000000
--- a/datafusion/sqllogictest/regenerate/src/engines/datafusion_engine/runner.rs
+++ /dev/null
@@ -1,131 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::sync::Arc;
-use std::{path::PathBuf, time::Duration};
-
-use super::{error::Result, normalize, DFSqlLogicTestError};
-use arrow::record_batch::RecordBatch;
-use async_trait::async_trait;
-use datafusion::physical_plan::common::collect;
-use datafusion::physical_plan::execute_stream;
-use datafusion::prelude::SessionContext;
-use indicatif::ProgressBar;
-use log::Level::{Debug, Info};
-use log::{debug, log_enabled, warn};
-use sqllogictest::DBOutput;
-use tokio::time::Instant;
-
-use crate::engines::output::{DFColumnType, DFOutput};
-
-pub struct DataFusion {
-    ctx: SessionContext,
-    relative_path: PathBuf,
-    pb: ProgressBar,
-}
-
-impl DataFusion {
-    pub fn new(ctx: SessionContext, relative_path: PathBuf, pb: ProgressBar) 
-> Self {
-        Self {
-            ctx,
-            relative_path,
-            pb,
-        }
-    }
-
-    fn update_slow_count(&self) {
-        let msg = self.pb.message();
-        let split: Vec<&str> = msg.split(" ").collect();
-        let mut current_count = 0;
-
-        if split.len() > 2 {
-            // third match will be current slow count
-            current_count = split[2].parse::<i32>().unwrap();
-        }
-
-        current_count += 1;
-
-        self.pb
-            .set_message(format!("{} - {} took > 500 ms", split[0], 
current_count));
-    }
-}
-
-#[async_trait]
-impl sqllogictest::AsyncDB for DataFusion {
-    type Error = DFSqlLogicTestError;
-    type ColumnType = DFColumnType;
-
-    async fn run(&mut self, sql: &str) -> Result<DFOutput> {
-        if log_enabled!(Debug) {
-            debug!(
-                "[{}] Running query: \"{}\"",
-                self.relative_path.display(),
-                sql
-            );
-        }
-
-        let start = Instant::now();
-        let result = run_query(&self.ctx, sql).await;
-        let duration = start.elapsed();
-
-        if duration.gt(&Duration::from_millis(500)) {
-            self.update_slow_count();
-        }
-
-        self.pb.inc(1);
-
-        if log_enabled!(Info) && duration.gt(&Duration::from_secs(2)) {
-            warn!(
-                "[{}] Running query took more than 2 sec ({duration:?}): 
\"{sql}\"",
-                self.relative_path.display()
-            );
-        }
-
-        result
-    }
-
-    /// Engine name of current database.
-    fn engine_name(&self) -> &str {
-        "DataFusion"
-    }
-
-    /// [`DataFusion`] calls this function to perform sleep.
-    ///
-    /// The default implementation is `std::thread::sleep`, which is universal 
to any async runtime
-    /// but would block the current thread. If you are running in tokio 
runtime, you should override
-    /// this by `tokio::time::sleep`.
-    async fn sleep(dur: Duration) {
-        tokio::time::sleep(dur).await;
-    }
-}
-
-async fn run_query(ctx: &SessionContext, sql: impl Into<String>) -> 
Result<DFOutput> {
-    let df = ctx.sql(sql.into().as_str()).await?;
-    let task_ctx = Arc::new(df.task_ctx());
-    let plan = df.create_physical_plan().await?;
-
-    let stream = execute_stream(plan, task_ctx)?;
-    let types = normalize::convert_schema_to_types(stream.schema().fields());
-    let results: Vec<RecordBatch> = collect(stream).await?;
-    let rows = normalize::convert_batches(results)?;
-
-    if rows.is_empty() && types.is_empty() {
-        Ok(DBOutput::StatementComplete(0))
-    } else {
-        Ok(DBOutput::Rows { types, rows })
-    }
-}
diff --git 
a/datafusion/sqllogictest/regenerate/src/engines/postgres_engine/mod.rs 
b/datafusion/sqllogictest/regenerate/src/engines/postgres_engine/mod.rs
deleted file mode 100644
index 050d19449c..0000000000
--- a/datafusion/sqllogictest/regenerate/src/engines/postgres_engine/mod.rs
+++ /dev/null
@@ -1,379 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use async_trait::async_trait;
-use bytes::Bytes;
-use datafusion::common::runtime::SpawnedTask;
-use futures::{SinkExt, StreamExt};
-use log::{debug, info};
-use sqllogictest::DBOutput;
-/// Postgres engine implementation for sqllogictest.
-use std::path::{Path, PathBuf};
-use std::str::FromStr;
-use std::time::Duration;
-
-use super::conversion::*;
-use crate::engines::output::{DFColumnType, DFOutput};
-use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
-use indicatif::ProgressBar;
-use postgres_types::Type;
-use rust_decimal::Decimal;
-use tokio::time::Instant;
-use tokio_postgres::{Column, Row};
-use types::PgRegtype;
-
-mod types;
-
-// default connect string, can be overridden by the `PG_URL` environment 
variable
-const PG_URI: &str = "postgresql://[email protected]/test";
-
-/// DataFusion sql-logicaltest error
-#[derive(Debug, thiserror::Error)]
-pub enum Error {
-    #[error("Postgres error: {0}")]
-    Postgres(#[from] tokio_postgres::error::Error),
-    #[error("Error handling copy command: {0}")]
-    Copy(String),
-}
-
-pub type Result<T, E = Error> = std::result::Result<T, E>;
-
-pub struct Postgres {
-    // None means the connection has been shutdown
-    client: Option<tokio_postgres::Client>,
-    _spawned_task: Option<SpawnedTask<()>>,
-    /// Relative test file path
-    relative_path: PathBuf,
-    pb: ProgressBar,
-}
-
-impl Postgres {
-    /// Creates a runner for executing queries against an existing postgres 
connection.
-    /// `relative_path` is used for display output and to create a postgres 
schema.
-    ///
-    /// The database connection details can be overridden by the
-    /// `PG_URI` environment variable.
-    ///
-    /// This defaults to
-    ///
-    /// ```text
-    /// PG_URI="postgresql://[email protected]/test"
-    /// ```
-    ///
-    /// See 
https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html#url
 for format
-    pub async fn connect(relative_path: PathBuf, pb: ProgressBar) -> 
Result<Self> {
-        let uri =
-            std::env::var("PG_URI").map_or(PG_URI.to_string(), 
std::convert::identity);
-
-        info!("Using postgres connection string: {uri}");
-
-        let config = tokio_postgres::Config::from_str(&uri)?;
-
-        // hint to user what the connection string was
-        let res = config.connect(tokio_postgres::NoTls).await;
-        if res.is_err() {
-            eprintln!("Error connecting to postgres using PG_URI={uri}");
-        };
-
-        let (client, connection) = res?;
-
-        let spawned_task = SpawnedTask::spawn(async move {
-            if let Err(e) = connection.await {
-                log::error!("Postgres connection error: {:?}", e);
-            }
-        });
-
-        let schema = schema_name(&relative_path);
-
-        // create a new clean schema for running the test
-        debug!("Creating new empty schema '{schema}'");
-        client
-            .execute(&format!("DROP SCHEMA IF EXISTS {schema} CASCADE"), &[])
-            .await?;
-
-        client
-            .execute(&format!("CREATE SCHEMA {schema}"), &[])
-            .await?;
-
-        client
-            .execute(&format!("SET search_path TO {schema}"), &[])
-            .await?;
-
-        Ok(Self {
-            client: Some(client),
-            _spawned_task: Some(spawned_task),
-            relative_path,
-            pb,
-        })
-    }
-
-    fn get_client(&mut self) -> &mut tokio_postgres::Client {
-        self.client.as_mut().expect("client is shutdown")
-    }
-
-    /// Special COPY command support. "COPY 'filename'" requires the
-    /// server to read the file which may not be possible (maybe it is
-    /// remote or running in some other docker container).
-    ///
-    /// Thus, we rewrite  sql statements like
-    ///
-    /// ```sql
-    /// COPY ... FROM 'filename' ...
-    /// ```
-    ///
-    /// Into
-    ///
-    /// ```sql
-    /// COPY ... FROM STDIN ...
-    /// ```
-    ///
-    /// And read the file locally.
-    async fn run_copy_command(&mut self, sql: &str) -> Result<DFOutput> {
-        let canonical_sql = sql.trim_start().to_ascii_lowercase();
-
-        debug!("Handling COPY command: {sql}");
-
-        // Hacky way to  find the 'filename' in the statement
-        let mut tokens = canonical_sql.split_whitespace().peekable();
-        let mut filename = None;
-
-        // COPY FROM '/opt/data/csv/aggregate_test_100.csv' ...
-        //
-        // into
-        //
-        // COPY FROM STDIN ...
-
-        let mut new_sql = vec![];
-        while let Some(tok) = tokens.next() {
-            new_sql.push(tok);
-            // rewrite FROM <file> to FROM STDIN
-            if tok == "from" {
-                filename = tokens.next();
-                new_sql.push("STDIN");
-            }
-        }
-
-        let filename = filename.map(no_quotes).ok_or_else(|| {
-            Error::Copy(format!("Can not find filename in COPY: {sql}"))
-        })?;
-
-        let new_sql = new_sql.join(" ");
-        debug!("Copying data from file {filename} using sql: {new_sql}");
-
-        // start the COPY command and get location to write data to
-        let tx = self.get_client().transaction().await?;
-        let sink = tx.copy_in(&new_sql).await?;
-        let mut sink = Box::pin(sink);
-
-        // read the input file as a string ans feed it to the copy command
-        let data = std::fs::read_to_string(filename)
-            .map_err(|e| Error::Copy(format!("Error reading {filename}: 
{e}")))?;
-
-        let mut data_stream = 
futures::stream::iter(vec![Ok(Bytes::from(data))]).boxed();
-
-        sink.send_all(&mut data_stream).await?;
-        sink.close().await?;
-        tx.commit().await?;
-        Ok(DBOutput::StatementComplete(0))
-    }
-
-    fn update_slow_count(&self) {
-        let msg = self.pb.message();
-        let split: Vec<&str> = msg.split(" ").collect();
-        let mut current_count = 0;
-
-        if split.len() > 2 {
-            // second match will be current slow count
-            current_count += split[2].parse::<i32>().unwrap();
-        }
-
-        current_count += 1;
-
-        self.pb
-            .set_message(format!("{} - {} took > 500 ms", split[0], 
current_count));
-    }
-}
-
-/// remove single quotes from the start and end of the string
-///
-/// 'filename' --> filename
-fn no_quotes(t: &str) -> &str {
-    t.trim_start_matches('\'').trim_end_matches('\'')
-}
-
-/// Given a file name like pg_compat_foo.slt
-/// return a schema name
-fn schema_name(relative_path: &Path) -> String {
-    relative_path
-        .to_string_lossy()
-        .chars()
-        .filter(|ch| ch.is_ascii_alphanumeric())
-        .collect::<String>()
-        .trim_start_matches("pg_")
-        .to_string()
-}
-
-#[async_trait]
-impl sqllogictest::AsyncDB for Postgres {
-    type Error = Error;
-    type ColumnType = DFColumnType;
-
-    async fn run(
-        &mut self,
-        sql: &str,
-    ) -> Result<DBOutput<Self::ColumnType>, Self::Error> {
-        debug!(
-            "[{}] Running query: \"{}\"",
-            self.relative_path.display(),
-            sql
-        );
-
-        let lower_sql = sql.trim_start().to_ascii_lowercase();
-
-        let is_query_sql = {
-            lower_sql.starts_with("select")
-                || lower_sql.starts_with("values")
-                || lower_sql.starts_with("show")
-                || lower_sql.starts_with("with")
-                || lower_sql.starts_with("describe")
-                || ((lower_sql.starts_with("insert")
-                    || lower_sql.starts_with("update")
-                    || lower_sql.starts_with("delete"))
-                    && lower_sql.contains("returning"))
-        };
-
-        if lower_sql.starts_with("copy") {
-            self.pb.inc(1);
-            return self.run_copy_command(sql).await;
-        }
-
-        if !is_query_sql {
-            self.get_client().execute(sql, &[]).await?;
-            self.pb.inc(1);
-            return Ok(DBOutput::StatementComplete(0));
-        }
-        let start = Instant::now();
-        let rows = self.get_client().query(sql, &[]).await?;
-        let duration = start.elapsed();
-
-        if duration.gt(&Duration::from_millis(500)) {
-            self.update_slow_count();
-        }
-
-        self.pb.inc(1);
-
-        let types: Vec<Type> = if rows.is_empty() {
-            self.get_client()
-                .prepare(sql)
-                .await?
-                .columns()
-                .iter()
-                .map(|c| c.type_().clone())
-                .collect()
-        } else {
-            rows[0]
-                .columns()
-                .iter()
-                .map(|c| c.type_().clone())
-                .collect()
-        };
-
-        if rows.is_empty() && types.is_empty() {
-            Ok(DBOutput::StatementComplete(0))
-        } else {
-            Ok(DBOutput::Rows {
-                types: convert_types(types),
-                rows: convert_rows(rows),
-            })
-        }
-    }
-
-    fn engine_name(&self) -> &str {
-        "postgres"
-    }
-
-}
-
-fn convert_rows(rows: Vec<Row>) -> Vec<Vec<String>> {
-    rows.iter()
-        .map(|row| {
-            row.columns()
-                .iter()
-                .enumerate()
-                .map(|(idx, column)| cell_to_string(row, column, idx))
-                .collect::<Vec<String>>()
-        })
-        .collect::<Vec<_>>()
-}
-
-macro_rules! make_string {
-    ($row:ident, $idx:ident, $t:ty) => {{
-        let value: Option<$t> = $row.get($idx);
-        match value {
-            Some(value) => value.to_string(),
-            None => NULL_STR.to_string(),
-        }
-    }};
-    ($row:ident, $idx:ident, $t:ty, $convert:ident) => {{
-        let value: Option<$t> = $row.get($idx);
-        match value {
-            Some(value) => $convert(value).to_string(),
-            None => NULL_STR.to_string(),
-        }
-    }};
-}
-
-fn cell_to_string(row: &Row, column: &Column, idx: usize) -> String {
-    match column.type_().clone() {
-        Type::CHAR => make_string!(row, idx, i8),
-        Type::INT2 => make_string!(row, idx, i16),
-        Type::INT4 => make_string!(row, idx, i32),
-        Type::INT8 => make_string!(row, idx, i64),
-        Type::NUMERIC => make_string!(row, idx, Decimal, decimal_to_str),
-        Type::DATE => make_string!(row, idx, NaiveDate),
-        Type::TIME => make_string!(row, idx, NaiveTime),
-        Type::TIMESTAMP => {
-            let value: Option<NaiveDateTime> = row.get(idx);
-            value
-                .map(|d| format!("{d:?}"))
-                .unwrap_or_else(|| "NULL".to_string())
-        }
-        Type::BOOL => make_string!(row, idx, bool, bool_to_str),
-        Type::BPCHAR | Type::VARCHAR | Type::TEXT => {
-            make_string!(row, idx, &str, varchar_to_str)
-        }
-        Type::FLOAT4 => make_string!(row, idx, f32, f32_to_str),
-        Type::FLOAT8 => make_string!(row, idx, f64, f64_to_str),
-        Type::REGTYPE => make_string!(row, idx, PgRegtype),
-        _ => unimplemented!("Unsupported type: {}", column.type_().name()),
-    }
-}
-
-fn convert_types(types: Vec<Type>) -> Vec<DFColumnType> {
-    types
-        .into_iter()
-        .map(|t| match t {
-            Type::BOOL => DFColumnType::Boolean,
-            Type::INT2 | Type::INT4 | Type::INT8 => DFColumnType::Integer,
-            Type::BPCHAR | Type::VARCHAR | Type::TEXT => DFColumnType::Text,
-            Type::FLOAT4 | Type::FLOAT8 | Type::NUMERIC => DFColumnType::Float,
-            Type::DATE | Type::TIME => DFColumnType::DateTime,
-            Type::TIMESTAMP => DFColumnType::Timestamp,
-            _ => DFColumnType::Another,
-        })
-        .collect()
-}
diff --git a/datafusion/sqllogictest/regenerate_sqlite_files.sh 
b/datafusion/sqllogictest/regenerate_sqlite_files.sh
index 981891c05b..615b910680 100755
--- a/datafusion/sqllogictest/regenerate_sqlite_files.sh
+++ b/datafusion/sqllogictest/regenerate_sqlite_files.sh
@@ -164,14 +164,12 @@ echo "Updating the datafusion/sqllogictest/Cargo.toml 
file with an updated sqllo
 
 # update the sqllogictest Cargo.toml with the new repo for sqllogictest-rs 
(tied to a specific hash)
 cd "$DF_HOME" || exit;
-sd -f i '^sqllogictest.*' 'sqllogictest = { git = 
"https://github.com/Omega359/sqllogictest-rs.git";, rev = "1cd933d" }' 
datafusion/sqllogictest/Cargo.toml
+sd -f i '^sqllogictest.*' 'sqllogictest = { git = 
"https://github.com/Omega359/sqllogictest-rs.git";, rev = "73c47cf7" }' 
datafusion/sqllogictest/Cargo.toml
 
 echo "Replacing the datafusion/sqllogictest/bin/sqllogictests.rs file with a 
custom version required for running completion"
 
-# replace the sqllogictest.rs with a customized versions.
-cp datafusion/sqllogictest/regenerate/sqllogictests.rs                        
datafusion/sqllogictest/bin/sqllogictests.rs
-cp datafusion/sqllogictest/regenerate/src/engines/postgres_engine/mod.rs      
datafusion/sqllogictest/src/engines/postgres_engine/mod.rs
-cp datafusion/sqllogictest/regenerate/src/engines/datafusion_engine/runner.rs 
datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs
+# replace the sqllogictest.rs with a customized version.
+cp datafusion/sqllogictest/regenerate/sqllogictests.rs 
datafusion/sqllogictest/bin/sqllogictests.rs
 
 echo "Running the sqllogictests with sqlite completion. This will take 
approximately an hour to run"
 
@@ -202,5 +200,6 @@ echo "Cleaning up source code changes and temporary files 
and directories"
 cd "$DF_HOME" || exit;
 find ./datafusion-testing/data/sqlite/ -type f -name "*.bak" -exec rm {} \;
 find ./datafusion/sqllogictest/test_files/pg_compat/ -type f -name "*.bak" 
-exec rm {} \;
-git checkout datafusion/sqllogictest
+git checkout datafusion/sqllogictest/Cargo.toml
+git checkout datafusion/sqllogictest/bin/sqllogictests.rs
 rm -rf /tmp/sqlitetesting


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to