This is an automated email from the ASF dual-hosted git repository.
kamir pushed a commit to branch kamir-patch-2
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
The following commit(s) were added to refs/heads/kamir-patch-2 by this push:
new dfe309ac finished WordCount with KafkaSource and KafkaSink
dfe309ac is described below
commit dfe309acebb6131ebee04e28971b0c34abe48c2f
Author: Mirko Kämpf <[email protected]>
AuthorDate: Mon Sep 2 20:59:25 2024 +0200
finished WordCount with KafkaSource and KafkaSink
---
.../scala/org/apache/wayang/api/DataQuanta.scala | 4 +-
wayang-applications/bin/bootstrap.sh | 52 +++++++++
.../bin/{run_wordcount.sh => cleanup.sh} | 18 +--
wayang-applications/bin/run_wordcount.sh | 2 +-
wayang-applications/bin/run_wordcount_kafka.sh | 2 +-
wayang-applications/pom.xml | 4 +-
.../wayang/applications/WordCountOnKafkaTopic.java | 126 +++++++++++++++++++++
.../wayang/basic/operators/KafkaTopicSink.java | 1 +
.../spark/operators/SparkKafkaTopicSource.java | 2 +-
9 files changed, 193 insertions(+), 18 deletions(-)
diff --git
a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala
b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala
index 80ae6742..0dd247a3 100644
---
a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala
+++
b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala
@@ -966,7 +966,8 @@ class DataQuanta[Out: ClassTag](val operator:
ElementaryOperator, outputIndex: I
topicName,
new TransformationDescriptor(formatterUdf, basicDataUnitType[Out],
basicDataUnitType[String], udfLoad)
)
- sink.setName(s"Write to KafkaTopic $topicName")
+ sink.setName(s"*#-> Write to KafkaTopic $topicName")
+ println(s"*#-> Write to KafkaTopic $topicName")
this.connectTo(sink, 0)
// Do the execution.
@@ -991,6 +992,7 @@ class DataQuanta[Out: ClassTag](val operator:
ElementaryOperator, outputIndex: I
new TransformationDescriptor(formatterUdf, basicDataUnitType[Out],
basicDataUnitType[String], udfLoad)
)
sink.setName(s"Write to $url")
+
this.connectTo(sink, 0)
// Do the execution.
diff --git a/wayang-applications/bin/bootstrap.sh
b/wayang-applications/bin/bootstrap.sh
new file mode 100755
index 00000000..cdf2a4d7
--- /dev/null
+++ b/wayang-applications/bin/bootstrap.sh
@@ -0,0 +1,52 @@
+
+#brew install confluentinc/tap/cli
+#brew install jq
+#brew install git-lfs
+
+export topic_l1_a=region_emea_counts
+export topic_l1_b=region_apac_counts
+export topic_l1_c=region_uswest_counts
+export topic_l2_a=global_contribution
+export topic_l2_b=global_averages
+
+#confluent login
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+confluent kafka topic delete topic_l1_a --cluster lkc-m2kpj2
+confluent kafka topic delete topic_l1_b --cluster lkc-m2kpj2
+confluent kafka topic delete topic_l1_c --cluster lkc-m2kpj2
+confluent kafka topic delete topic_l2_a --cluster lkc-m2kpj2
+confluent kafka topic delete topic_l2_b --cluster lkc-m2kpj2
+
+confluent kafka topic create topic_l1_a --cluster lkc-m2kpj2
+confluent kafka topic create topic_l1_b --cluster lkc-m2kpj2
+confluent kafka topic create topic_l1_c --cluster lkc-m2kpj2
+confluent kafka topic create topic_l2_a --cluster lkc-m2kpj2
+confluent kafka topic create topic_l2_b --cluster lkc-m2kpj2
+
+######################################################################################################
+#
https://docs.confluent.io/cloud/current/sr/schema_registry_ccloud_tutorial.html
+export
SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO="ZZUZ3HASNNFGE2DF:EQNB0QpzDd868qlW0Nz49anodp7JjeDkoCaZelCJiUhfTX7BhuRPhNlDA/swx/Fa"
+export
SCHEMA_REGISTRY_URL="https://psrc-lo5k9.eu-central-1.aws.confluent.cloud"
+
+confluent kafka topic list --cluster lkc-m2kpj2
+
+###
+# List schemas
+curl --silent -X GET -u $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO
$SCHEMA_REGISTRY_URL/subjects | jq .
diff --git a/wayang-applications/bin/run_wordcount.sh
b/wayang-applications/bin/cleanup.sh
similarity index 67%
copy from wayang-applications/bin/run_wordcount.sh
copy to wayang-applications/bin/cleanup.sh
index 87b026a9..a81a432c 100755
--- a/wayang-applications/bin/run_wordcount.sh
+++ b/wayang-applications/bin/cleanup.sh
@@ -14,18 +14,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-#
-
-export JAVA_HOME=/Users/kamir/.sdkman/candidates/java/current
-
-cd ..
-cd ..
-
-#mvn clean compile package install -pl :wayang-assembly -Pdistribution
-DskipTests
-cd wayang-applications
-mvn compile package install -DskipTests
-cd ..
+confluent kafka topic delete topic_l1_a --cluster lkc-m2kpj2
+confluent kafka topic delete topic_l1_b --cluster lkc-m2kpj2
+confluent kafka topic delete topic_l1_c --cluster lkc-m2kpj2
+confluent kafka topic delete topic_l2_a --cluster lkc-m2kpj2
+confluent kafka topic delete topic_l2_b --cluster lkc-m2kpj2
-source ./.env.sh; bin/wayang-submit org.apache.wayang.applications.WordCount
java file://$(pwd)/wayang-applications/data/case-study/DATA_REPO_001/README.md
+confluent kafka topic list --cluster lkc-m2kpj2
\ No newline at end of file
diff --git a/wayang-applications/bin/run_wordcount.sh
b/wayang-applications/bin/run_wordcount.sh
index 87b026a9..9f1a26e8 100755
--- a/wayang-applications/bin/run_wordcount.sh
+++ b/wayang-applications/bin/run_wordcount.sh
@@ -21,7 +21,7 @@ export JAVA_HOME=/Users/kamir/.sdkman/candidates/java/current
cd ..
cd ..
-#mvn clean compile package install -pl :wayang-assembly -Pdistribution
-DskipTests
+mvn clean compile package install -pl :wayang-assembly -Pdistribution
-DskipTests
cd wayang-applications
mvn compile package install -DskipTests
diff --git a/wayang-applications/bin/run_wordcount_kafka.sh
b/wayang-applications/bin/run_wordcount_kafka.sh
index 87b026a9..89b2ff1b 100755
--- a/wayang-applications/bin/run_wordcount_kafka.sh
+++ b/wayang-applications/bin/run_wordcount_kafka.sh
@@ -28,4 +28,4 @@ mvn compile package install -DskipTests
cd ..
-source ./.env.sh; bin/wayang-submit org.apache.wayang.applications.WordCount
java file://$(pwd)/wayang-applications/data/case-study/DATA_REPO_001/README.md
+source ./.env.sh; bin/wayang-submit
org.apache.wayang.applications.WordCountOnKafkaTopic
\ No newline at end of file
diff --git a/wayang-applications/pom.xml b/wayang-applications/pom.xml
index 756b6b20..3dab581e 100644
--- a/wayang-applications/pom.xml
+++ b/wayang-applications/pom.xml
@@ -83,8 +83,8 @@
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-api-scala-java_2.12</artifactId>
<version>${WAYANG_VERSION}</version>
- <!--scope>system</scope-->
-
<!--systemPath>/Users/mkaempf/GITHUB.private/kamir-incubator-wayang/wayang-assembly/target/apache-wayang-assembly-0.7.1-SNAPSHOT-incubating-dist/wayang-0.7.1-SNAPSHOT/jars/wayang-api-scala-java_2.12-0.7.1-SNAPSHOT.jar</systemPath-->
+ <scope>system</scope>
+
<systemPath>/Users/kamir/GITHUB.merge/incubator-wayang/wayang-assembly/target/apache-wayang-assembly-0.7.1-incubating-dist/wayang-0.7.1/jars/wayang-api-scala-java-0.7.1.jar</systemPath>
</dependency>
<!--dependency>
<groupId>org.apache.wayang</groupId>
diff --git
a/wayang-applications/src/main/java/org/apache/wayang/applications/WordCountOnKafkaTopic.java
b/wayang-applications/src/main/java/org/apache/wayang/applications/WordCountOnKafkaTopic.java
new file mode 100644
index 00000000..03d3096f
--- /dev/null
+++
b/wayang-applications/src/main/java/org/apache/wayang/applications/WordCountOnKafkaTopic.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.applications;
+
+import org.apache.wayang.api.JavaPlanBuilder;
+import org.apache.wayang.basic.data.Tuple2;
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.api.WayangContext;
+import org.apache.wayang.core.function.FunctionDescriptor;
+import
org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator;
+import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator;
+import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators;
+import org.apache.wayang.java.Java;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+// Import the Logger class
+import org.apache.log4j.Logger;
+
+
+public class WordCountOnKafkaTopic {
+
+
+ // Create a logger instance
+ private static final Logger logger =
Logger.getLogger(WordCountOnKafkaTopic.class);
+
+ // Define the lambda function for formatting the output
+ private static final
FunctionDescriptor.SerializableFunction<Tuple2<String, Integer>, String> udf =
tuple -> {
+ return tuple.getField0() + ": " + tuple.getField1();
+ };
+
+ public static void main(String[] args){
+
+ System.out.println( ">>> Apache Wayang Test #02");
+ System.out.println( " Process data from a Kafka topic using a 'Java
Context'.");
+
+ // Default topic name
+ String input_topicName = "banking-tx-small-csv";
+ String output_topicName =
"word_count_contribution___banking-tx-small-csv";
+
+ System.out.println( " Topic: " + input_topicName );
+
+ // Check if at least one argument is provided
+ if (args.length > 0) {
+ // Assuming the first argument is the topic name
+ input_topicName = args[0];
+
+ int i = 0;
+ for (String arg : args) {
+ String line = String.format( " %d - %s", i,arg);
+ System.out.println(line);
+ i=i+1;
+ }
+
+ }
+ else {
+ System.out.println( "*** Use default topic name: " +
input_topicName );
+ }
+
+ Configuration configuration = new Configuration();
+ // Get a plan builder.
+ WayangContext wayangContext = new WayangContext(configuration)
+ .withPlugin(Java.basicPlugin());
+ // .withPlugin(Spark.basicPlugin());
+ JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
+ .withJobName(String.format("WordCount using Java Context on
Kafka topic (%s)", input_topicName))
+ .withUdfJarOf(WordCountOnKafkaTopic.class);
+
+ // Start building the WayangPlan.
+ //Collection<Tuple2<String, Integer>> wordcounts_collection =
+ planBuilder
+ // Read the text file.
+ .readKafkaTopic(input_topicName).withName("Load data from
topic")
+
+ // Split each line by non-word characters.
+ .flatMap(line -> Arrays.asList(line.split("\\W+")))
+ .withSelectivity(10, 100, 0.9)
+ .withName("Split words")
+
+ // Filter empty tokens.
+ .filter(token -> !token.isEmpty())
+ .withSelectivity(0.99, 0.99, 0.99)
+ .withName("Filter empty words")
+
+ // Attach counter to each word.
+ .map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To
lower case, add counter")
+
+ // Sum up counters for every word.
+ .reduceByKey(
+ Tuple2::getField0,
+ (t1, t2) -> new Tuple2<>(t1.getField0(),
t1.getField1() + t2.getField1())
+ )
+ .withCardinalityEstimator(new DefaultCardinalityEstimator(0.9,
1, false, in -> Math.round(0.01 * in[0])))
+ .withName("Add counters")
+
+ // Execute the plan and collect the results.
+ //.collect();
+
+ .writeKafkaTopic(output_topicName, d -> String.format("%d,
%s", d.getField1(), d.getField0()), "job_test_1",
+
LoadProfileEstimators.createFromSpecification("wayang.java.kafkatopicsink.load",
configuration));
+
+ //System.out.println( wordcounts_collection );
+ System.out.println( "### Done. ***" );
+
+
+ }
+
+
+}
\ No newline at end of file
diff --git
a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/KafkaTopicSink.java
b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/KafkaTopicSink.java
index 2df9ba36..beff5b2e 100644
---
a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/KafkaTopicSink.java
+++
b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/KafkaTopicSink.java
@@ -139,6 +139,7 @@ public class KafkaTopicSink<T> extends UnarySink<T>
implements Serializable {
if ( props == null ) {
props = getDefaultProperties();
System.out.println(">>> Create producer from DEFAULT PROPERTIES.");
+ props.list( System.out );
}
else {
System.out.println(">>> Create producer from PROPERTIES: " +
props);
diff --git
a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkKafkaTopicSource.java
b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkKafkaTopicSource.java
index a3831b32..ef9e7b8d 100644
---
a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkKafkaTopicSource.java
+++
b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkKafkaTopicSource.java
@@ -70,7 +70,7 @@ public class SparkKafkaTopicSource extends KafkaTopicSource
implements SparkExec
System.out.println("### 7 ... ");
this.initConsumer( (KafkaTopicSource) this );
- ConsumerRecords<String, String> records =
this.getConsumer().poll(Duration.ofMillis(15000));
+ ConsumerRecords<String, String> records =
this.getConsumer().poll(Duration.ofMillis(100));
List<String> collectedRecords = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {