This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-benchmarks.git


The following commit(s) were added to refs/heads/main by this push:
     new 964c70f  Add script to generate TPC-H data and convert it to Parquet 
using DataFusion (#2)
964c70f is described below

commit 964c70f6067bf9b041b165decc5277dc38678897
Author: Andy Grove <[email protected]>
AuthorDate: Mon May 20 10:52:19 2024 -0600

    Add script to generate TPC-H data and convert it to Parquet using 
DataFusion (#2)
    
    * save
    
    * basic script
    
    * ASF header
    
    * ASF header
    
    * fix
    
    * improve README
    
    * refactor script
    
    * use thread pool
    
    * Update tpch/tpchgen.py
    
    Co-authored-by: Liang-Chi Hsieh <[email protected]>
    
    * stop running docker image in interactive mode
    
    * add timing
    
    ---------
    
    Co-authored-by: Liang-Chi Hsieh <[email protected]>
---
 README.md       | 19 ++++++++++++
 tpch/.gitignore |  2 ++
 tpch/README.md  | 47 +++++++++++++++++++++++++++++
 tpch/tpchgen.py | 93 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 161 insertions(+)

diff --git a/README.md b/README.md
index e363a4d..7e48bfb 100644
--- a/README.md
+++ b/README.md
@@ -1,3 +1,22 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
 # Apache DataFusion Benchmarks
 
 This repository has been created as a place to store documentation and scripts 
that can be used to run benchmarks against Apache DataFusion and its 
subprojects (Python Bindings, Comet, and Ballista).
diff --git a/tpch/.gitignore b/tpch/.gitignore
new file mode 100644
index 0000000..081fb32
--- /dev/null
+++ b/tpch/.gitignore
@@ -0,0 +1,2 @@
+data
+venv
diff --git a/tpch/README.md b/tpch/README.md
new file mode 100644
index 0000000..c1bede1
--- /dev/null
+++ b/tpch/README.md
@@ -0,0 +1,47 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# TPC-H
+
+## Generating TPC-H data with Spark
+
+Databricks provides tooling for generating TPC-H datasets in a Spark cluster:
+
+[https://github.com/databricks/spark-sql-perf](https://github.com/databricks/spark-sql-perf)
+
+## Generating TPC-H data without Spark
+
+For local development and testing, we provide a Python script to generate 
TPC-H CSV data and convert it into Parquet, 
+using DataFusion.
+
+The script requires Docker to be available because it uses the Docker image 
`ghcr.io/scalytics/tpch-docker` to run
+the TPC-H data generator.
+
+Data can be generated as a single Parquet file per table by specifying 
`--partitions 1`. 
+
+Data will be generated into a `data` directory in the current working 
directory.
+
+```shell
+python tpchgen.py --scale-factor 1 --partitions 1
+```
+Data can be generated as multiple Parquet files per table by specifying 
`--partitions` greater than one. 
+
+```shell
+python tpchgen.py --scale-factor 1000 --partitions 64
+```
\ No newline at end of file
diff --git a/tpch/tpchgen.py b/tpch/tpchgen.py
new file mode 100644
index 0000000..a0cd406
--- /dev/null
+++ b/tpch/tpchgen.py
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import concurrent.futures
+from datafusion import SessionContext
+import os
+import subprocess
+import time
+
+table_names = ["customer", "lineitem", "nation", "orders", "part", "partsupp", 
"region", "supplier"]
+
+def run(cmd: str):
+    print(f"Executing: {cmd}")
+    subprocess.run(cmd, shell=True, check=True)
+
+def run_and_log_output(cmd: str, log_file: str):
+    print(f"Executing: {cmd}; writing output to {log_file}")
+    with open(log_file, "w") as file:
+        subprocess.run(cmd, shell=True, check=True, stdout=file, 
stderr=subprocess.STDOUT)
+
+def convert_tbl_to_parquet(ctx: SessionContext, tbl_filename: str, 
file_extension: str, parquet_filename: str):
+    print(f"Converting {tbl_filename} to {parquet_filename} ...")
+    df = ctx.read_csv(tbl_filename, has_header=False, 
file_extension=file_extension, delimiter="|")
+    df.write_parquet(parquet_filename)
+
+def generate_tpch(scale_factor: int, partitions: int):
+    start_time = time.time()
+    if partitions == 1:
+        command = f"docker run -v `pwd`/data:/data -t --rm 
ghcr.io/scalytics/tpch-docker:main -vf -s {scale_factor}"
+        run_and_log_output(command, "/tmp/tpchgen.log")
+
+        # convert to parquet
+        ctx = SessionContext()
+        for table in table_names:
+            convert_tbl_to_parquet(ctx, f"data/{table}.tbl", "tbl", 
f"data/{table}.parquet")
+
+    else:
+
+        max_threads = os.cpu_count()
+
+        # List of commands to run
+        commands = [
+            (f"docker run -v `pwd`/data:/data -t --rm 
ghcr.io/scalytics/tpch-docker:main -vf -s {scale_factor} -C {partitions} -S 
{part}",
+             f"/tmp/tpchgen-part{part}.log")
+            for part in range(1, partitions + 1)
+        ]
+
+        # run commands in parallel
+        with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as 
executor:
+            futures = [executor.submit(run_and_log_output, command, log_file) 
for (command, log_file) in commands]
+
+            # wait for all futures to complete
+            for future in concurrent.futures.as_completed(futures):
+                try:
+                    future.result()
+                except Exception as e:
+                    print(f"Command failed with exception: {e}")
+
+        # convert to parquet
+        ctx = SessionContext()
+        for table in table_names:
+            run(f"mkdir -p data/{table}.parquet")
+            if table == "nation" or table == "region":
+                # nation and region are special cases and do not generate 
multiple files
+                convert_tbl_to_parquet(ctx, f"data/{table}.tbl", "tbl", 
f"data/{table}.parquet/part1.parquet")
+            else:
+                for part in range(1, partitions + 1):
+                    convert_tbl_to_parquet(ctx, f"data/{table}.tbl.{part}", 
f"tbl.{part}", f"data/{table}.parquet/part{part}.parquet")
+
+    end_time = time.time()
+    print(f"Finished in {round(end_time - start_time, 2)} seconds")
+
+if __name__ == '__main__':
+    arg_parser = argparse.ArgumentParser()
+    arg_parser.add_argument('--scale-factor', type=int, help='The scale 
factor')
+    arg_parser.add_argument('--partitions', type=int, help='The number of 
partitions')
+    args = arg_parser.parse_args()
+    generate_tpch(args.scale_factor, args.partitions)
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to