This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c76051a prettier (#438)
8c76051a is described below

commit 8c76051a39a5f06275270ab8e0cbf620f3f2830b
Author: Andy Grove <[email protected]>
AuthorDate: Mon Oct 24 00:08:40 2022 -0600

    prettier (#438)
---
 benchmarks/README.md                               |  81 +++++-
 benchmarks/spark/.gitignore                        |   1 +
 benchmarks/spark/README.md                         |  96 ++++++++
 benchmarks/spark/pom.xml                           | 104 ++++++++
 .../main/scala/org/apache/arrow/SparkTpch.scala    | 273 +++++++++++++++++++++
 5 files changed, 545 insertions(+), 10 deletions(-)

diff --git a/benchmarks/README.md b/benchmarks/README.md
index 80f47e96..b39be9e2 100644
--- a/benchmarks/README.md
+++ b/benchmarks/README.md
@@ -44,7 +44,7 @@ to the `.gitignore` file.
 Build the Python bindings and then run:
 
 ```bash
-$ python tpch.py --query q1 --path /mnt/bigdata/tpch/sf1-parquet/ 
+$ python tpch.py --query q1 --path /mnt/bigdata/tpch/sf1-parquet/
 Registering table part at path /mnt/bigdata/tpch/sf1-parquet//part
 Registering table supplier at path /mnt/bigdata/tpch/sf1-parquet//supplier
 Registering table partsupp at path /mnt/bigdata/tpch/sf1-parquet//partsupp
@@ -56,7 +56,7 @@ Registering table region at path 
/mnt/bigdata/tpch/sf1-parquet//region
 Query q1 took 9.668351173400879 second(s)
 ```
 
-Note that this Python script currently only supports running against file 
formats than contain a schema 
+Note that this Python script currently only supports running against file 
formats than contain a schema
 definition (such as Parquet).
 
 ## Running the DataFusion Benchmarks in Rust
@@ -178,6 +178,67 @@ Query 1 iteration 0 took 1956.1 ms
 Query 1 avg time: 1956.11 ms
 ```
 
+## Comparing Performance with Apache Spark
+
+We run benchmarks to compare performance with Spark to identify future areas 
of optimization. We publish the latest
+results in the top-level README.
+
+### Ballista
+
+Build with the `release-lto` profile.
+
+```bash
+cargo build --profile release-lto
+```
+
+Run the cluster.
+
+```bash
+./target/release-lto/ballista-scheduler
+./target/release-lto/ballista-executor -c 24
+```
+
+Running the benchmark.
+
+```bash
+./target/release-lto/tpch benchmark ballista \
+    --host localhost \
+    --port 50050 \
+    --path /mnt/bigdata/tpch/sf10-parquet-float/ \
+    --format parquet \
+    --iterations 1 \
+    --partitions 24 \
+    --query 1
+```
+
+### Spark
+
+Start the cluster.
+
+```bash
+./sbin/start-master.sh
+./sbin/start-worker.sh spark://ripper:7077
+```
+
+Run the benchmark.
+
+```bash
+$SPARK_HOME/bin/spark-submit \
+    --master spark://ripper:7077 \
+    --class org.apache.arrow.ballista.SparkTpch \
+    --conf spark.driver.memory=8G \
+    --num-executors=1 \
+    --conf spark.executor.memory=32G \
+    --conf spark.executor.cores=24 \
+    --conf spark.cores.max=24 \
+    target/spark-tpch-0.5.0-SNAPSHOT-jar-with-dependencies.jar \
+    tpch \
+    --input-path /mnt/bigdata/tpch/sf10-parquet-float/ \
+    --input-format parquet \
+    --query-path /home/andy/git/apache/arrow-ballista/benchmarks/queries \
+    --query 1
+```
+
 ## NYC Taxi Benchmark
 
 These benchmarks are based on the [New York Taxi and Limousine Commission][2] 
data set.
@@ -199,14 +260,14 @@ Query 'fare_amt_by_passenger' iteration 2 took 7969 ms
 ## Running the Ballista Loadtest
 
 ```bash
- cargo run --bin tpch -- loadtest  ballista-load 
-  --query-list 1,3,5,6,7,10,12,13 
-  --requests 200 
-  --concurrency 10  
-  --data-path /**** 
-  --format parquet 
-  --host localhost 
-  --port 50050 
+ cargo run --bin tpch -- loadtest  ballista-load
+  --query-list 1,3,5,6,7,10,12,13
+  --requests 200
+  --concurrency 10
+  --data-path /****
+  --format parquet
+  --host localhost
+  --port 50050
   --sql-path /***
   --debug
 ```
diff --git a/benchmarks/spark/.gitignore b/benchmarks/spark/.gitignore
new file mode 100644
index 00000000..1de56593
--- /dev/null
+++ b/benchmarks/spark/.gitignore
@@ -0,0 +1 @@
+target
\ No newline at end of file
diff --git a/benchmarks/spark/README.md b/benchmarks/spark/README.md
new file mode 100644
index 00000000..9b2fe7fd
--- /dev/null
+++ b/benchmarks/spark/README.md
@@ -0,0 +1,96 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# Spark Benchmarks
+
+These benchmarks exist so that we can measure relative performance between 
Ballista and
+Apache Spark
+
+## Pre-requisites
+
+- Download Apache Maven from https://maven.apache.org/download.cgi
+- Download Apache Spark 3.3.0 from https://spark.apache.org/downloads.html
+
+Untar these downloads and set `MAVEN_HOME` and `SPARK_HOME` environment 
variables to point to the
+install location.
+
+## Build the benchmark JAR file
+
+```bash
+$MAVEN_HOME/bin/mvn package
+```
+
+## Generating TPC-H data
+
+Use https://crates.io/crates/tpctools
+
+## Converting TPC-H data from CSV to Parquet
+
+```bash
+$SPARK_HOME/bin/spark-submit --master spark://localhost:7077 \
+    --class org.apache.arrow.SparkTpch \
+    --conf spark.driver.memory=8G \
+    --num-executors=1 \
+    --conf spark.executor.memory=32G \
+    --conf spark.executor.cores=24 \
+    --conf spark.cores.max=24 \
+    target/spark-tpch-0.5.0-SNAPSHOT-jar-with-dependencies.jar \
+    convert-tpch \
+    --input-path /mnt/bigdata/tpch/sf10-csv/ \
+    --input-format tbl \
+    --output-path /mnt/bigdata/tpch/sf10-parquet/ \
+    --output-format parquet \
+    --partitions 24
+```
+
+## Submit the benchmark application to the cluster
+
+```bash
+$SPARK_HOME/bin/spark-submit --master spark://localhost:7077 \
+    --class org.apache.arrow.SparkTpch \
+    --conf spark.driver.memory=8G \
+    --num-executors=1 \
+    --conf spark.executor.memory=32G \
+    --conf spark.executor.cores=24 \
+    --conf spark.cores.max=24 \
+    target/spark-tpch-0.5.0-SNAPSHOT-jar-with-dependencies.jar \
+    tpch \
+    --input-path /mnt/bigdata/tpch/sf10-parquet-float/ \
+    --input-format parquet \
+    --query-path /path/to/arrow-ballista/benchmarks/queries \
+    --query 1
+```
+
+# Standalone Mode
+
+## Start a local Spark cluster in standalone mode
+
+```bash
+$SPARK_HOME/sbin/start-master.sh
+$SPARK_HOME/sbin/start-slave.sh spark://localhost:7077
+```
+
+Monitor progress via the Spark UI at http://localhost:8080
+
+## Shut down the cluster
+
+```bash
+$SPARK_HOME/sbin/stop-slave.sh
+$SPARK_HOME/sbin/stop-master.sh
+```
diff --git a/benchmarks/spark/pom.xml b/benchmarks/spark/pom.xml
new file mode 100644
index 00000000..60dc47d1
--- /dev/null
+++ b/benchmarks/spark/pom.xml
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.arrow</groupId>
+    <artifactId>spark-tpch</artifactId>
+    <version>0.5.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+
+    <name>Benchmarks derived from TPC-H</name>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <java.version>1.8</java.version>
+        <scala.binary.version>2.12</scala.binary.version>
+        <scala.version>2.12.11</scala.version>
+        <spark.version>3.3.0</spark.version>
+    </properties>
+
+    <build>
+        <sourceDirectory>src/main/scala</sourceDirectory>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version>
+                <configuration>
+                    <source>${java.version}</source>
+                    <target>${java.version}</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <version>4.5.3</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <scalaVersion>${scala.version}</scalaVersion>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.rogach</groupId>
+            <artifactId>scallop_${scala.binary.version}</artifactId>
+            <version>3.5.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git a/benchmarks/spark/src/main/scala/org/apache/arrow/SparkTpch.scala 
b/benchmarks/spark/src/main/scala/org/apache/arrow/SparkTpch.scala
new file mode 100644
index 00000000..93bdac82
--- /dev/null
+++ b/benchmarks/spark/src/main/scala/org/apache/arrow/SparkTpch.scala
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.arrow
+
+import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.rogach.scallop.{ScallopConf, Subcommand}
+
+import scala.collection.mutable.ListBuffer
+import scala.io.Source
+
+class Conf(args: Array[String]) extends ScallopConf(args) {
+  val convertTpch = new Subcommand("convert-tpch") {
+    val inputPath = opt[String](required = true)
+    val inputFormat = opt[String](required = true)
+    val outputPath = opt[String](required = true)
+    val outputFormat = opt[String](required = true)
+    val partitions = opt[Int](required = true)
+  }
+  val tpch = new Subcommand("tpch") {
+    val inputPath = opt[String](required = true)
+    val queryPath = opt[String](required = true)
+    val inputFormat = opt[String](required = true)
+    val query = opt[String](required = true)
+    val iterations = opt[Int](required = false, default = Some(1))
+  }
+  addSubcommand(convertTpch)
+  addSubcommand(tpch)
+  requireSubcommand()
+  verify()
+}
+
+object SparkTpch {
+
+  def main(args: Array[String]): Unit = {
+    val conf = new Conf(args)
+
+    val spark: SparkSession = SparkSession.builder
+      .appName("Ballista Spark Benchmarks")
+      .getOrCreate()
+
+    conf.subcommand match {
+      case Some(conf.tpch) =>
+        // register tables
+        for (table <- Tpch.tables) {
+          val df = readTable(
+            conf,
+            spark,
+            table,
+            conf.tpch.inputPath(),
+            conf.tpch.inputFormat()
+          )
+          df.createTempView(table)
+        }
+
+        val sqlFile = s"${conf.tpch.queryPath()}/q${conf.tpch.query()}.sql"
+        val source = Source.fromFile(sqlFile)
+        val sql = source.getLines.mkString("\n")
+        source.close()
+        println(sql)
+
+        val timing = new ListBuffer[Long]()
+        for (i <- 0 until conf.tpch.iterations()) {
+          println(s"Iteration $i")
+          val start = System.currentTimeMillis()
+          val resultDf = spark.sql(sql)
+          val results = resultDf.collect()
+
+          val duration = System.currentTimeMillis() - start
+          // show results
+          println(s"Returned ${results.size} rows:")
+          results.foreach(println)
+
+          resultDf.explain(true)
+          println(s"Iteration $i took $duration ms")
+          timing += duration
+        }
+
+        // summarize the results
+        timing.zipWithIndex.foreach {
+          case (n, i) => println(s"Iteration $i took $n ms")
+        }
+
+      case Some(conf.`convertTpch`) =>
+        for (table <- Tpch.tables) {
+          val df = readTable(
+            conf,
+            spark,
+            table,
+            conf.convertTpch.inputPath(),
+            conf.convertTpch.inputFormat()
+          )
+
+          conf.convertTpch.outputFormat() match {
+            case "parquet" =>
+              val path = s"${conf.convertTpch.outputPath()}/${table}"
+              df.repartition(conf.convertTpch.partitions())
+                .write
+                .mode(SaveMode.Overwrite)
+                .parquet(path)
+            case "csv" =>
+              val path = s"${conf.convertTpch.outputPath()}/${table}.csv"
+              df.repartition(conf.convertTpch.partitions())
+                .write
+                .mode(SaveMode.Overwrite)
+                .csv(path)
+            case _ =>
+              throw new IllegalArgumentException("unsupported output format")
+          }
+        }
+
+      case _ =>
+        throw new IllegalArgumentException("no subcommand specified")
+    }
+  }
+
+  private def readTable(
+      conf: Conf,
+      spark: SparkSession,
+      tableName: String,
+      inputPath: String,
+      inputFormat: String
+  ): DataFrame = {
+    inputFormat match {
+      case "tbl" =>
+        val path = s"${inputPath}/${tableName}.tbl"
+        spark.read
+          .option("header", "false")
+          .option("inferSchema", "false")
+          .option("delimiter", "|")
+          .schema(Tpch.tableSchema(tableName))
+          .csv(path)
+      case "csv" =>
+        val path = s"${inputPath}/${tableName}.csv"
+        spark.read
+          .option("header", "false")
+          .option("inferSchema", "false")
+          .schema(Tpch.tableSchema(tableName))
+          .csv(path)
+      case "parquet" =>
+        val path = s"${inputPath}/${tableName}"
+        spark.read
+          .parquet(path)
+      case _ =>
+        throw new IllegalArgumentException("unsupported input format")
+    }
+  }
+}
+
+object Tpch {
+
+  val tables = Seq(
+    "part",
+    "supplier",
+    "partsupp",
+    "customer",
+    "orders",
+    "lineitem",
+    "nation",
+    "region"
+  )
+
+  def tableSchema(tableName: String) = {
+
+    // TODO should be using DecimalType here
+    //val decimalType = DataTypes.createDecimalType(15, 2)
+    val decimalType = DataTypes.DoubleType
+
+    tableName match {
+      case "part" => new StructType(Array(
+            StructField("p_partkey", DataTypes.LongType, false),
+            StructField("p_name", DataTypes.StringType, false),
+            StructField("p_mfgr", DataTypes.StringType, false),
+            StructField("p_brand", DataTypes.StringType, false),
+            StructField("p_type", DataTypes.StringType, false),
+            StructField("p_size", DataTypes.IntegerType, false),
+            StructField("p_container", DataTypes.StringType, false),
+            StructField("p_retailprice", decimalType, false),
+            StructField("p_comment", DataTypes.StringType, false)
+      ))
+
+      case "supplier" => new StructType(Array(
+          StructField("s_suppkey", DataTypes.LongType, false),
+          StructField("s_name", DataTypes.StringType, false),
+          StructField("s_address", DataTypes.StringType, false),
+          StructField("s_nationkey", DataTypes.LongType, false),
+          StructField("s_phone", DataTypes.StringType, false),
+          StructField("s_acctbal", decimalType, false),
+          StructField("s_comment", DataTypes.StringType, false)
+      ))
+
+      case "partsupp" => new StructType(Array(
+          StructField("ps_partkey", DataTypes.LongType, false),
+          StructField("ps_suppkey", DataTypes.LongType, false),
+          StructField("ps_availqty", DataTypes.IntegerType, false),
+          StructField("ps_supplycost", decimalType, false),
+          StructField("ps_comment", DataTypes.StringType, false)
+      ))
+
+      case "customer" => new StructType(Array(
+          StructField("c_custkey", DataTypes.LongType, false),
+          StructField("c_name", DataTypes.StringType, false),
+          StructField("c_address", DataTypes.StringType, false),
+          StructField("c_nationkey", DataTypes.LongType, false),
+          StructField("c_phone", DataTypes.StringType, false),
+          StructField("c_acctbal", decimalType, false),
+          StructField("c_mktsegment", DataTypes.StringType, false),
+          StructField("c_comment", DataTypes.StringType, false)
+      ))
+
+      case "orders" => new StructType(Array(
+          StructField("o_orderkey", DataTypes.LongType, false),
+          StructField("o_custkey", DataTypes.LongType, false),
+          StructField("o_orderstatus", DataTypes.StringType, false),
+          StructField("o_totalprice", decimalType, false),
+          StructField("o_orderdate", DataTypes.DateType, false),
+          StructField("o_orderpriority", DataTypes.StringType, false),
+          StructField("o_clerk", DataTypes.StringType, false),
+          StructField("o_shippriority", DataTypes.IntegerType, false),
+          StructField("o_comment", DataTypes.StringType, false)
+      ))
+
+      case "lineitem" => new StructType(Array(
+          StructField("l_orderkey", DataTypes.LongType, false),
+          StructField("l_partkey", DataTypes.LongType, false),
+          StructField("l_suppkey", DataTypes.LongType, false),
+          StructField("l_linenumber", DataTypes.IntegerType, false),
+          StructField("l_quantity", decimalType, false),
+          StructField("l_extendedprice", decimalType, false),
+          StructField("l_discount", decimalType, false),
+          StructField("l_tax", decimalType, false),
+          StructField("l_returnflag", DataTypes.StringType, false),
+          StructField("l_linestatus", DataTypes.StringType, false),
+          StructField("l_shipdate", DataTypes.DateType, false),
+          StructField("l_commitdate", DataTypes.DateType, false),
+          StructField("l_receiptdate", DataTypes.DateType, false),
+          StructField("l_shipinstruct", DataTypes.StringType, false),
+          StructField("l_shipmode", DataTypes.StringType, false),
+          StructField("l_comment", DataTypes.StringType, false)
+      ))
+
+      case "nation" => new StructType(Array(
+          StructField("n_nationkey", DataTypes.LongType, false),
+          StructField("n_name", DataTypes.StringType, false),
+          StructField("n_regionkey", DataTypes.LongType, false),
+          StructField("n_comment", DataTypes.StringType, false)
+      ))
+
+      case "region" => new StructType(Array(
+          StructField("r_regionkey", DataTypes.LongType, false),
+          StructField("r_name", DataTypes.StringType, false),
+          StructField("r_comment", DataTypes.StringType, false)
+      ))
+
+      case other => throw new UnsupportedOperationException(tableName)
+    }
+  }
+}

Reply via email to