This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 8c76051a prettier (#438)
8c76051a is described below
commit 8c76051a39a5f06275270ab8e0cbf620f3f2830b
Author: Andy Grove <[email protected]>
AuthorDate: Mon Oct 24 00:08:40 2022 -0600
prettier (#438)
---
benchmarks/README.md | 81 +++++-
benchmarks/spark/.gitignore | 1 +
benchmarks/spark/README.md | 96 ++++++++
benchmarks/spark/pom.xml | 104 ++++++++
.../main/scala/org/apache/arrow/SparkTpch.scala | 273 +++++++++++++++++++++
5 files changed, 545 insertions(+), 10 deletions(-)
diff --git a/benchmarks/README.md b/benchmarks/README.md
index 80f47e96..b39be9e2 100644
--- a/benchmarks/README.md
+++ b/benchmarks/README.md
@@ -44,7 +44,7 @@ to the `.gitignore` file.
Build the Python bindings and then run:
```bash
-$ python tpch.py --query q1 --path /mnt/bigdata/tpch/sf1-parquet/
+$ python tpch.py --query q1 --path /mnt/bigdata/tpch/sf1-parquet/
Registering table part at path /mnt/bigdata/tpch/sf1-parquet//part
Registering table supplier at path /mnt/bigdata/tpch/sf1-parquet//supplier
Registering table partsupp at path /mnt/bigdata/tpch/sf1-parquet//partsupp
@@ -56,7 +56,7 @@ Registering table region at path
/mnt/bigdata/tpch/sf1-parquet//region
Query q1 took 9.668351173400879 second(s)
```
-Note that this Python script currently only supports running against file
formats than contain a schema
+Note that this Python script currently only supports running against file
formats than contain a schema
definition (such as Parquet).
## Running the DataFusion Benchmarks in Rust
@@ -178,6 +178,67 @@ Query 1 iteration 0 took 1956.1 ms
Query 1 avg time: 1956.11 ms
```
+## Comparing Performance with Apache Spark
+
+We run benchmarks to compare performance with Spark to identify future areas
of optimization. We publish the latest
+results in the top-level README.
+
+### Ballista
+
+Build with the `release-lto` profile.
+
+```bash
+cargo build --profile release-lto
+```
+
+Run the cluster.
+
+```bash
+./target/release-lto/ballista-scheduler
+./target/release-lto/ballista-executor -c 24
+```
+
+Running the benchmark.
+
+```bash
+./target/release-lto/tpch benchmark ballista \
+ --host localhost \
+ --port 50050 \
+ --path /mnt/bigdata/tpch/sf10-parquet-float/ \
+ --format parquet \
+ --iterations 1 \
+ --partitions 24 \
+ --query 1
+```
+
+### Spark
+
+Start the cluster.
+
+```bash
+./sbin/start-master.sh
+./sbin/start-worker.sh spark://ripper:7077
+```
+
+Run the benchmark.
+
+```bash
+$SPARK_HOME/bin/spark-submit \
+ --master spark://ripper:7077 \
+ --class org.apache.arrow.ballista.SparkTpch \
+ --conf spark.driver.memory=8G \
+ --num-executors=1 \
+ --conf spark.executor.memory=32G \
+ --conf spark.executor.cores=24 \
+ --conf spark.cores.max=24 \
+ target/spark-tpch-0.5.0-SNAPSHOT-jar-with-dependencies.jar \
+ tpch \
+ --input-path /mnt/bigdata/tpch/sf10-parquet-float/ \
+ --input-format parquet \
+ --query-path /home/andy/git/apache/arrow-ballista/benchmarks/queries \
+ --query 1
+```
+
## NYC Taxi Benchmark
These benchmarks are based on the [New York Taxi and Limousine Commission][2]
data set.
@@ -199,14 +260,14 @@ Query 'fare_amt_by_passenger' iteration 2 took 7969 ms
## Running the Ballista Loadtest
```bash
- cargo run --bin tpch -- loadtest ballista-load
- --query-list 1,3,5,6,7,10,12,13
- --requests 200
- --concurrency 10
- --data-path /****
- --format parquet
- --host localhost
- --port 50050
+ cargo run --bin tpch -- loadtest ballista-load
+ --query-list 1,3,5,6,7,10,12,13
+ --requests 200
+ --concurrency 10
+ --data-path /****
+ --format parquet
+ --host localhost
+ --port 50050
--sql-path /***
--debug
```
diff --git a/benchmarks/spark/.gitignore b/benchmarks/spark/.gitignore
new file mode 100644
index 00000000..1de56593
--- /dev/null
+++ b/benchmarks/spark/.gitignore
@@ -0,0 +1 @@
+target
\ No newline at end of file
diff --git a/benchmarks/spark/README.md b/benchmarks/spark/README.md
new file mode 100644
index 00000000..9b2fe7fd
--- /dev/null
+++ b/benchmarks/spark/README.md
@@ -0,0 +1,96 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Spark Benchmarks
+
+These benchmarks exist so that we can measure relative performance between
Ballista and
+Apache Spark
+
+## Pre-requisites
+
+- Download Apache Maven from https://maven.apache.org/download.cgi
+- Download Apache Spark 3.3.0 from https://spark.apache.org/downloads.html
+
+Untar these downloads and set `MAVEN_HOME` and `SPARK_HOME` environment
variables to point to the
+install location.
+
+## Build the benchmark JAR file
+
+```bash
+$MAVEN_HOME/bin/mvn package
+```
+
+## Generating TPC-H data
+
+Use https://crates.io/crates/tpctools
+
+## Converting TPC-H data from CSV to Parquet
+
+```bash
+$SPARK_HOME/bin/spark-submit --master spark://localhost:7077 \
+ --class org.apache.arrow.SparkTpch \
+ --conf spark.driver.memory=8G \
+ --num-executors=1 \
+ --conf spark.executor.memory=32G \
+ --conf spark.executor.cores=24 \
+ --conf spark.cores.max=24 \
+ target/spark-tpch-0.5.0-SNAPSHOT-jar-with-dependencies.jar \
+ convert-tpch \
+ --input-path /mnt/bigdata/tpch/sf10-csv/ \
+ --input-format tbl \
+ --output-path /mnt/bigdata/tpch/sf10-parquet/ \
+ --output-format parquet \
+ --partitions 24
+```
+
+## Submit the benchmark application to the cluster
+
+```bash
+$SPARK_HOME/bin/spark-submit --master spark://localhost:7077 \
+ --class org.apache.arrow.SparkTpch \
+ --conf spark.driver.memory=8G \
+ --num-executors=1 \
+ --conf spark.executor.memory=32G \
+ --conf spark.executor.cores=24 \
+ --conf spark.cores.max=24 \
+ target/spark-tpch-0.5.0-SNAPSHOT-jar-with-dependencies.jar \
+ tpch \
+ --input-path /mnt/bigdata/tpch/sf10-parquet-float/ \
+ --input-format parquet \
+ --query-path /path/to/arrow-ballista/benchmarks/queries \
+ --query 1
+```
+
+# Standalone Mode
+
+## Start a local Spark cluster in standalone mode
+
+```bash
+$SPARK_HOME/sbin/start-master.sh
+$SPARK_HOME/sbin/start-slave.sh spark://localhost:7077
+```
+
+Monitor progress via the Spark UI at http://localhost:8080
+
+## Shut down the cluster
+
+```bash
+$SPARK_HOME/sbin/stop-slave.sh
+$SPARK_HOME/sbin/stop-master.sh
+```
diff --git a/benchmarks/spark/pom.xml b/benchmarks/spark/pom.xml
new file mode 100644
index 00000000..60dc47d1
--- /dev/null
+++ b/benchmarks/spark/pom.xml
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>spark-tpch</artifactId>
+ <version>0.5.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+ <name>Benchmarks derived from TPC-H</name>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <java.version>1.8</java.version>
+ <scala.binary.version>2.12</scala.binary.version>
+ <scala.version>2.12.11</scala.version>
+ <spark.version>3.3.0</spark.version>
+ </properties>
+
+ <build>
+ <sourceDirectory>src/main/scala</sourceDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>4.5.3</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <scalaVersion>${scala.version}</scalaVersion>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
+
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.rogach</groupId>
+ <artifactId>scallop_${scala.binary.version}</artifactId>
+ <version>3.5.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/benchmarks/spark/src/main/scala/org/apache/arrow/SparkTpch.scala
b/benchmarks/spark/src/main/scala/org/apache/arrow/SparkTpch.scala
new file mode 100644
index 00000000..93bdac82
--- /dev/null
+++ b/benchmarks/spark/src/main/scala/org/apache/arrow/SparkTpch.scala
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.arrow
+
+import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.rogach.scallop.{ScallopConf, Subcommand}
+
+import scala.collection.mutable.ListBuffer
+import scala.io.Source
+
+class Conf(args: Array[String]) extends ScallopConf(args) {
+ val convertTpch = new Subcommand("convert-tpch") {
+ val inputPath = opt[String](required = true)
+ val inputFormat = opt[String](required = true)
+ val outputPath = opt[String](required = true)
+ val outputFormat = opt[String](required = true)
+ val partitions = opt[Int](required = true)
+ }
+ val tpch = new Subcommand("tpch") {
+ val inputPath = opt[String](required = true)
+ val queryPath = opt[String](required = true)
+ val inputFormat = opt[String](required = true)
+ val query = opt[String](required = true)
+ val iterations = opt[Int](required = false, default = Some(1))
+ }
+ addSubcommand(convertTpch)
+ addSubcommand(tpch)
+ requireSubcommand()
+ verify()
+}
+
+object SparkTpch {
+
+ def main(args: Array[String]): Unit = {
+ val conf = new Conf(args)
+
+ val spark: SparkSession = SparkSession.builder
+ .appName("Ballista Spark Benchmarks")
+ .getOrCreate()
+
+ conf.subcommand match {
+ case Some(conf.tpch) =>
+ // register tables
+ for (table <- Tpch.tables) {
+ val df = readTable(
+ conf,
+ spark,
+ table,
+ conf.tpch.inputPath(),
+ conf.tpch.inputFormat()
+ )
+ df.createTempView(table)
+ }
+
+ val sqlFile = s"${conf.tpch.queryPath()}/q${conf.tpch.query()}.sql"
+ val source = Source.fromFile(sqlFile)
+ val sql = source.getLines.mkString("\n")
+ source.close()
+ println(sql)
+
+ val timing = new ListBuffer[Long]()
+ for (i <- 0 until conf.tpch.iterations()) {
+ println(s"Iteration $i")
+ val start = System.currentTimeMillis()
+ val resultDf = spark.sql(sql)
+ val results = resultDf.collect()
+
+ val duration = System.currentTimeMillis() - start
+ // show results
+ println(s"Returned ${results.size} rows:")
+ results.foreach(println)
+
+ resultDf.explain(true)
+ println(s"Iteration $i took $duration ms")
+ timing += duration
+ }
+
+ // summarize the results
+ timing.zipWithIndex.foreach {
+ case (n, i) => println(s"Iteration $i took $n ms")
+ }
+
+ case Some(conf.`convertTpch`) =>
+ for (table <- Tpch.tables) {
+ val df = readTable(
+ conf,
+ spark,
+ table,
+ conf.convertTpch.inputPath(),
+ conf.convertTpch.inputFormat()
+ )
+
+ conf.convertTpch.outputFormat() match {
+ case "parquet" =>
+ val path = s"${conf.convertTpch.outputPath()}/${table}"
+ df.repartition(conf.convertTpch.partitions())
+ .write
+ .mode(SaveMode.Overwrite)
+ .parquet(path)
+ case "csv" =>
+ val path = s"${conf.convertTpch.outputPath()}/${table}.csv"
+ df.repartition(conf.convertTpch.partitions())
+ .write
+ .mode(SaveMode.Overwrite)
+ .csv(path)
+ case _ =>
+ throw new IllegalArgumentException("unsupported output format")
+ }
+ }
+
+ case _ =>
+ throw new IllegalArgumentException("no subcommand specified")
+ }
+ }
+
+ private def readTable(
+ conf: Conf,
+ spark: SparkSession,
+ tableName: String,
+ inputPath: String,
+ inputFormat: String
+ ): DataFrame = {
+ inputFormat match {
+ case "tbl" =>
+ val path = s"${inputPath}/${tableName}.tbl"
+ spark.read
+ .option("header", "false")
+ .option("inferSchema", "false")
+ .option("delimiter", "|")
+ .schema(Tpch.tableSchema(tableName))
+ .csv(path)
+ case "csv" =>
+ val path = s"${inputPath}/${tableName}.csv"
+ spark.read
+ .option("header", "false")
+ .option("inferSchema", "false")
+ .schema(Tpch.tableSchema(tableName))
+ .csv(path)
+ case "parquet" =>
+ val path = s"${inputPath}/${tableName}"
+ spark.read
+ .parquet(path)
+ case _ =>
+ throw new IllegalArgumentException("unsupported input format")
+ }
+ }
+}
+
+object Tpch {
+
+ val tables = Seq(
+ "part",
+ "supplier",
+ "partsupp",
+ "customer",
+ "orders",
+ "lineitem",
+ "nation",
+ "region"
+ )
+
+ def tableSchema(tableName: String) = {
+
+ // TODO should be using DecimalType here
+ //val decimalType = DataTypes.createDecimalType(15, 2)
+ val decimalType = DataTypes.DoubleType
+
+ tableName match {
+ case "part" => new StructType(Array(
+ StructField("p_partkey", DataTypes.LongType, false),
+ StructField("p_name", DataTypes.StringType, false),
+ StructField("p_mfgr", DataTypes.StringType, false),
+ StructField("p_brand", DataTypes.StringType, false),
+ StructField("p_type", DataTypes.StringType, false),
+ StructField("p_size", DataTypes.IntegerType, false),
+ StructField("p_container", DataTypes.StringType, false),
+ StructField("p_retailprice", decimalType, false),
+ StructField("p_comment", DataTypes.StringType, false)
+ ))
+
+ case "supplier" => new StructType(Array(
+ StructField("s_suppkey", DataTypes.LongType, false),
+ StructField("s_name", DataTypes.StringType, false),
+ StructField("s_address", DataTypes.StringType, false),
+ StructField("s_nationkey", DataTypes.LongType, false),
+ StructField("s_phone", DataTypes.StringType, false),
+ StructField("s_acctbal", decimalType, false),
+ StructField("s_comment", DataTypes.StringType, false)
+ ))
+
+ case "partsupp" => new StructType(Array(
+ StructField("ps_partkey", DataTypes.LongType, false),
+ StructField("ps_suppkey", DataTypes.LongType, false),
+ StructField("ps_availqty", DataTypes.IntegerType, false),
+ StructField("ps_supplycost", decimalType, false),
+ StructField("ps_comment", DataTypes.StringType, false)
+ ))
+
+ case "customer" => new StructType(Array(
+ StructField("c_custkey", DataTypes.LongType, false),
+ StructField("c_name", DataTypes.StringType, false),
+ StructField("c_address", DataTypes.StringType, false),
+ StructField("c_nationkey", DataTypes.LongType, false),
+ StructField("c_phone", DataTypes.StringType, false),
+ StructField("c_acctbal", decimalType, false),
+ StructField("c_mktsegment", DataTypes.StringType, false),
+ StructField("c_comment", DataTypes.StringType, false)
+ ))
+
+ case "orders" => new StructType(Array(
+ StructField("o_orderkey", DataTypes.LongType, false),
+ StructField("o_custkey", DataTypes.LongType, false),
+ StructField("o_orderstatus", DataTypes.StringType, false),
+ StructField("o_totalprice", decimalType, false),
+ StructField("o_orderdate", DataTypes.DateType, false),
+ StructField("o_orderpriority", DataTypes.StringType, false),
+ StructField("o_clerk", DataTypes.StringType, false),
+ StructField("o_shippriority", DataTypes.IntegerType, false),
+ StructField("o_comment", DataTypes.StringType, false)
+ ))
+
+ case "lineitem" => new StructType(Array(
+ StructField("l_orderkey", DataTypes.LongType, false),
+ StructField("l_partkey", DataTypes.LongType, false),
+ StructField("l_suppkey", DataTypes.LongType, false),
+ StructField("l_linenumber", DataTypes.IntegerType, false),
+ StructField("l_quantity", decimalType, false),
+ StructField("l_extendedprice", decimalType, false),
+ StructField("l_discount", decimalType, false),
+ StructField("l_tax", decimalType, false),
+ StructField("l_returnflag", DataTypes.StringType, false),
+ StructField("l_linestatus", DataTypes.StringType, false),
+ StructField("l_shipdate", DataTypes.DateType, false),
+ StructField("l_commitdate", DataTypes.DateType, false),
+ StructField("l_receiptdate", DataTypes.DateType, false),
+ StructField("l_shipinstruct", DataTypes.StringType, false),
+ StructField("l_shipmode", DataTypes.StringType, false),
+ StructField("l_comment", DataTypes.StringType, false)
+ ))
+
+ case "nation" => new StructType(Array(
+ StructField("n_nationkey", DataTypes.LongType, false),
+ StructField("n_name", DataTypes.StringType, false),
+ StructField("n_regionkey", DataTypes.LongType, false),
+ StructField("n_comment", DataTypes.StringType, false)
+ ))
+
+ case "region" => new StructType(Array(
+ StructField("r_regionkey", DataTypes.LongType, false),
+ StructField("r_name", DataTypes.StringType, false),
+ StructField("r_comment", DataTypes.StringType, false)
+ ))
+
+ case other => throw new UnsupportedOperationException(tableName)
+ }
+ }
+}