This is an automated email from the ASF dual-hosted git repository.
iwasakims pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bigtop.git
The following commit(s) were added to refs/heads/master by this push:
new 76c48c5af BIGTOP-4370. Integrate the bigpetstore-spark workflow with
Airflow. (#1355)
76c48c5af is described below
commit 76c48c5af06b63007da176ff2ffcc5dd851e9213
Author: Kengo Seki <[email protected]>
AuthorDate: Wed Jul 23 14:38:42 2025 +0900
BIGTOP-4370. Integrate the bigpetstore-spark workflow with Airflow. (#1355)
---
bigtop-bigpetstore/bigpetstore-spark/README.md | 45 +++++++++++++
bigtop-bigpetstore/bigpetstore-spark/build.gradle | 2 +-
.../bigpetstore-spark/images/dag_list.png | Bin 0 -> 245248 bytes
.../bigpetstore-spark/images/running_dag.png | Bin 0 -> 330673 bytes
.../bigpetstore-spark/images/trigger_dag.png | Bin 0 -> 182612 bytes
bigtop-deploy/puppet/hieradata/bigtop/cluster.yaml | 1 +
.../puppet/modules/airflow/manifests/init.pp | 25 ++++++-
.../airflow/templates/example_bigpetstore.py | 72 +++++++++++++++++++++
bigtop-packages/src/common/airflow/airflow.default | 1 +
9 files changed, 144 insertions(+), 2 deletions(-)
diff --git a/bigtop-bigpetstore/bigpetstore-spark/README.md
b/bigtop-bigpetstore/bigpetstore-spark/README.md
index e550e48f3..9cd1bdaf3 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/README.md
+++ b/bigtop-bigpetstore/bigpetstore-spark/README.md
@@ -172,3 +172,48 @@ spark-submit --master local[2] --class
org.apache.bigtop.bigpetstore.spark.analy
```
The resulting json file will contain lists of customers, products, and
products recommended to each customer.
+
+Airflow Integration
+--------------------------------------------
+
+The steps described above are consolidated into [a single Airflow
DAG](../../bigtop-deploy/puppet/modules/airflow/templates/example_bigpetstore.py).
+You can try it as follows. The example here is tested on Debian 12, and
assuming [Puppet](../../bigtop_toolchain/bin/puppetize.sh) and [Bigtop
toolchain](../../bigtop_toolchain/README.md) are already installed and BPS
Spark is built as a shadowed JAR in accordance with [the guide
above](#building-and-running-with-spark).
+
+1. (Optional) build Airflow and Spark including their dependencies.
+ You can skip it if you use Bigtop's binary distribution packages on its
public repository.
+
+```
+$ ./gradlew allclean airflow-pkg spark-pkg repo -Dbuildwithdeps=true
+```
+
+2. Deploy the packages above though Bigtop's Puppet manifests with appropriate
parameters:
+
+```
+$ cat bigtop-deploy/puppet/hieradata/site.yaml
+bigtop::bigtop_repo_gpg_check: false
+bigtop::bigtop_repo_uri: [...]
+bigtop::hadoop_head_node: ...
+hadoop::hadoop_storage_dirs: [/data]
+hadoop_cluster_node::cluster_components: [bigtop-utils, hdfs, yarn, spark,
airflow]
+airflow::server::install_bigpetstore_example: true # Enable the BigPetStore
DAG
+airflow::server::load_examples: false # Disable Airflow's default examples
for simplicity
+$ sudo cp -r bigtop-deploy/puppet/hiera* /etc/puppet
+$ sudo puppet apply --hiera_config=/etc/puppet/hiera.yaml
--modulepath=/vagrant_data/bigtop/bigtop-deploy/puppet/modules:/etc/puppet/code/modules:/usr/share/puppet/modules
/vagrant_data/bigtop/bigtop-deploy/puppet/manifests
+```
+
+3. Create output directories on HDFS with the airflow owner:
+
+```
+$ sudo -u hdfs hdfs dfs -mkdir /user/airflow
+$ sudo -u hdfs hdfs dfs -chown airflow:airflow /user/airflow
+```
+
+4. Login Airflow's web UI with admin/admin and wait for the DAG picked up for
a while.
+ Once it's found, you can run it through the triangle button on the right.
+ 
+
+5. Trigger the DAG with the path to the BPS Spark JAR.
+ 
+
+6. If settings are appropriate, the BPS DAG should successfully run as follows:
+ 
diff --git a/bigtop-bigpetstore/bigpetstore-spark/build.gradle
b/bigtop-bigpetstore/bigpetstore-spark/build.gradle
index 970065b48..eae173eac 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/build.gradle
+++ b/bigtop-bigpetstore/bigpetstore-spark/build.gradle
@@ -91,7 +91,7 @@ dependencies {
compile("org.apache.spark:spark-core_${scalaVersion}:${sparkVersion}")
compile("org.apache.spark:spark-mllib_${scalaVersion}:${sparkVersion}")
compile("org.apache.spark:spark-sql_${scalaVersion}:${sparkVersion}")
- compile "org.apache.bigtop:bigpetstore-data-generator:3.5.0-SNAPSHOT"
+ compile "org.apache.bigtop:bigpetstore-data-generator:3.6.0-SNAPSHOT"
compile "org.json4s:json4s-jackson_${scalaVersion}:3.6.12"
testCompile "junit:junit:4.13.2"
diff --git a/bigtop-bigpetstore/bigpetstore-spark/images/dag_list.png
b/bigtop-bigpetstore/bigpetstore-spark/images/dag_list.png
new file mode 100644
index 000000000..3e18b441a
Binary files /dev/null and
b/bigtop-bigpetstore/bigpetstore-spark/images/dag_list.png differ
diff --git a/bigtop-bigpetstore/bigpetstore-spark/images/running_dag.png
b/bigtop-bigpetstore/bigpetstore-spark/images/running_dag.png
new file mode 100644
index 000000000..b8dbde8a2
Binary files /dev/null and
b/bigtop-bigpetstore/bigpetstore-spark/images/running_dag.png differ
diff --git a/bigtop-bigpetstore/bigpetstore-spark/images/trigger_dag.png
b/bigtop-bigpetstore/bigpetstore-spark/images/trigger_dag.png
new file mode 100644
index 000000000..9c18cb33c
Binary files /dev/null and
b/bigtop-bigpetstore/bigpetstore-spark/images/trigger_dag.png differ
diff --git a/bigtop-deploy/puppet/hieradata/bigtop/cluster.yaml
b/bigtop-deploy/puppet/hieradata/bigtop/cluster.yaml
index c27cc26ad..630ba9166 100644
--- a/bigtop-deploy/puppet/hieradata/bigtop/cluster.yaml
+++ b/bigtop-deploy/puppet/hieradata/bigtop/cluster.yaml
@@ -218,3 +218,4 @@ ranger::admin::admin_password: "Admin01234"
airflow::server::executor: "SequentialExecutor"
airflow::server::load_examples: "True"
airflow::server::sql_alchemy_conn: "sqlite:////var/lib/airflow/airflow.db"
+airflow::server::install_bigpetstore_example: "False"
diff --git a/bigtop-deploy/puppet/modules/airflow/manifests/init.pp
b/bigtop-deploy/puppet/modules/airflow/manifests/init.pp
index 1c26b3964..0dd9cb15d 100644
--- a/bigtop-deploy/puppet/modules/airflow/manifests/init.pp
+++ b/bigtop-deploy/puppet/modules/airflow/manifests/init.pp
@@ -20,7 +20,7 @@ class airflow {
}
}
- class server($executor, $load_examples, $sql_alchemy_conn) {
+ class server($executor, $load_examples, $sql_alchemy_conn,
$install_bigpetstore_example=False) {
package { 'airflow':
ensure => latest,
}
@@ -55,5 +55,28 @@ class airflow {
ensure => running,
require => Exec['airflow-db-init'],
}
+
+ if $install_bigpetstore_example {
+ exec { 'install-spark-provider':
+ command => "/usr/lib/airflow/bin/python3 -m pip install
apache-airflow-providers-apache-spark 'pyspark<4'",
+ environment => ['AIRFLOW_HOME=/var/lib/airflow'],
+ user => 'root',
+ require => Package['airflow'],
+ }
+
+ file { '/var/lib/airflow/dags':
+ ensure => 'directory',
+ owner => 'airflow',
+ group => 'airflow',
+ require => Package['airflow'],
+ }
+
+ file { '/var/lib/airflow/dags/example_bigpetstore.py':
+ content => template('airflow/example_bigpetstore.py'),
+ owner => 'airflow',
+ group => 'airflow',
+ require => File['/var/lib/airflow/dags'],
+ }
+ }
}
}
diff --git
a/bigtop-deploy/puppet/modules/airflow/templates/example_bigpetstore.py
b/bigtop-deploy/puppet/modules/airflow/templates/example_bigpetstore.py
new file mode 100644
index 000000000..1b8cb7f2a
--- /dev/null
+++ b/bigtop-deploy/puppet/modules/airflow/templates/example_bigpetstore.py
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pendulum
+
+from airflow.models import DAG
+from airflow.models.param import Param
+from airflow.operators.bash import BashOperator
+from airflow.providers.apache.spark.operators.spark_submit import
SparkSubmitOperator
+
+with DAG(
+ "bigpetstore_dag",
+ params={"bigpetstore_jar_path": Param("bigpetstore-spark-3.6.0-all.jar")},
+ schedule=None,
+ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+) as dag:
+ clean_hdfs_task = BashOperator(
+ bash_command="hdfs dfs -rm -f -r generated_data transformed_data",
+ task_id="clean_hdfs_task",
+ )
+
+ clean_fs_task = BashOperator(
+ bash_command="rm -f /tmp/PetStoreStats.json /tmp/recommendations.json",
+ task_id="clean_fs_task",
+ )
+
+ generate_task = SparkSubmitOperator(
+ application="{{ params.bigpetstore_jar_path }}",
+ application_args=["generated_data", "10", "1000", "365", "345"],
+ java_class="org.apache.bigtop.bigpetstore.spark.generator.SparkDriver",
+ task_id="generate_task"
+ )
+
+ transform_task = SparkSubmitOperator(
+ application="{{ params.bigpetstore_jar_path }}",
+ application_args=["generated_data", "transformed_data"],
+ java_class="org.apache.bigtop.bigpetstore.spark.etl.SparkETL",
+ task_id="transform_task"
+ )
+
+ analyze_task = SparkSubmitOperator(
+ application="{{ params.bigpetstore_jar_path }}",
+ application_args=["transformed_data", "/tmp/PetStoreStats.json"],
+
java_class="org.apache.bigtop.bigpetstore.spark.analytics.PetStoreStatistics",
+ task_id="analyze_task"
+ )
+
+ recommend_task = SparkSubmitOperator(
+ application="{{ params.bigpetstore_jar_path }}",
+ application_args=["transformed_data", "/tmp/recommendations.json"],
+
java_class="org.apache.bigtop.bigpetstore.spark.analytics.RecommendProducts",
+ task_id="recommend_task"
+ )
+
+ [clean_hdfs_task, clean_fs_task] >> generate_task >> transform_task >>
[analyze_task, recommend_task]
+
+if __name__ == "__main__":
+ dag.test()
diff --git a/bigtop-packages/src/common/airflow/airflow.default
b/bigtop-packages/src/common/airflow/airflow.default
index 3c73385b5..7fb32a7ea 100644
--- a/bigtop-packages/src/common/airflow/airflow.default
+++ b/bigtop-packages/src/common/airflow/airflow.default
@@ -21,4 +21,5 @@
#
# AIRFLOW_CONFIG=
AIRFLOW_HOME=/var/lib/airflow
+HADOOP_CONF_DIR=/etc/hadoop/conf
PATH=/usr/lib/airflow/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin