This is an automated email from the ASF dual-hosted git repository.

kamir pushed a commit to branch kamir-patch-2
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git


The following commit(s) were added to refs/heads/kamir-patch-2 by this push:
     new dfe309ac finished WordCount with KafkaSource and KafkaSink
dfe309ac is described below

commit dfe309acebb6131ebee04e28971b0c34abe48c2f
Author: Mirko Kämpf <[email protected]>
AuthorDate: Mon Sep 2 20:59:25 2024 +0200

    finished WordCount with KafkaSource and KafkaSink
---
 .../scala/org/apache/wayang/api/DataQuanta.scala   |   4 +-
 wayang-applications/bin/bootstrap.sh               |  52 +++++++++
 .../bin/{run_wordcount.sh => cleanup.sh}           |  18 +--
 wayang-applications/bin/run_wordcount.sh           |   2 +-
 wayang-applications/bin/run_wordcount_kafka.sh     |   2 +-
 wayang-applications/pom.xml                        |   4 +-
 .../wayang/applications/WordCountOnKafkaTopic.java | 126 +++++++++++++++++++++
 .../wayang/basic/operators/KafkaTopicSink.java     |   1 +
 .../spark/operators/SparkKafkaTopicSource.java     |   2 +-
 9 files changed, 193 insertions(+), 18 deletions(-)

diff --git 
a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala
 
b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala
index 80ae6742..0dd247a3 100644
--- 
a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala
+++ 
b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala
@@ -966,7 +966,8 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
         topicName,
         new TransformationDescriptor(formatterUdf, basicDataUnitType[Out], 
basicDataUnitType[String], udfLoad)
       )
-      sink.setName(s"Write to KafkaTopic $topicName")
+      sink.setName(s"*#-> Write to KafkaTopic $topicName")
+      println(s"*#-> Write to KafkaTopic $topicName")
       this.connectTo(sink, 0)
 
       // Do the execution.
@@ -991,6 +992,7 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
       new TransformationDescriptor(formatterUdf, basicDataUnitType[Out], 
basicDataUnitType[String], udfLoad)
     )
     sink.setName(s"Write to $url")
+
     this.connectTo(sink, 0)
 
     // Do the execution.
diff --git a/wayang-applications/bin/bootstrap.sh 
b/wayang-applications/bin/bootstrap.sh
new file mode 100755
index 00000000..cdf2a4d7
--- /dev/null
+++ b/wayang-applications/bin/bootstrap.sh
@@ -0,0 +1,52 @@
+
+#brew install confluentinc/tap/cli
+#brew install jq
+#brew install git-lfs
+
+export topic_l1_a=region_emea_counts
+export topic_l1_b=region_apac_counts
+export topic_l1_c=region_uswest_counts
+export topic_l2_a=global_contribution
+export topic_l2_b=global_averages
+
+#confluent login
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+confluent kafka topic delete topic_l1_a --cluster lkc-m2kpj2
+confluent kafka topic delete topic_l1_b --cluster lkc-m2kpj2
+confluent kafka topic delete topic_l1_c --cluster lkc-m2kpj2
+confluent kafka topic delete topic_l2_a --cluster lkc-m2kpj2
+confluent kafka topic delete topic_l2_b --cluster lkc-m2kpj2
+
+confluent kafka topic create topic_l1_a --cluster lkc-m2kpj2
+confluent kafka topic create topic_l1_b --cluster lkc-m2kpj2
+confluent kafka topic create topic_l1_c --cluster lkc-m2kpj2
+confluent kafka topic create topic_l2_a --cluster lkc-m2kpj2
+confluent kafka topic create topic_l2_b --cluster lkc-m2kpj2
+
+######################################################################################################
+# 
https://docs.confluent.io/cloud/current/sr/schema_registry_ccloud_tutorial.html
+export 
SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO="ZZUZ3HASNNFGE2DF:EQNB0QpzDd868qlW0Nz49anodp7JjeDkoCaZelCJiUhfTX7BhuRPhNlDA/swx/Fa"
+export 
SCHEMA_REGISTRY_URL="https://psrc-lo5k9.eu-central-1.aws.confluent.cloud";
+
+confluent kafka topic list --cluster lkc-m2kpj2
+
+###
+# List schemas
+curl --silent -X GET -u $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO 
$SCHEMA_REGISTRY_URL/subjects | jq .
diff --git a/wayang-applications/bin/run_wordcount.sh 
b/wayang-applications/bin/cleanup.sh
similarity index 67%
copy from wayang-applications/bin/run_wordcount.sh
copy to wayang-applications/bin/cleanup.sh
index 87b026a9..a81a432c 100755
--- a/wayang-applications/bin/run_wordcount.sh
+++ b/wayang-applications/bin/cleanup.sh
@@ -14,18 +14,12 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
-
-export JAVA_HOME=/Users/kamir/.sdkman/candidates/java/current
-
-cd ..
-cd ..
-
-#mvn clean compile package install -pl :wayang-assembly -Pdistribution 
-DskipTests
 
-cd wayang-applications
-mvn compile package install -DskipTests
 
-cd ..
+confluent kafka topic delete topic_l1_a --cluster lkc-m2kpj2
+confluent kafka topic delete topic_l1_b --cluster lkc-m2kpj2
+confluent kafka topic delete topic_l1_c --cluster lkc-m2kpj2
+confluent kafka topic delete topic_l2_a --cluster lkc-m2kpj2
+confluent kafka topic delete topic_l2_b --cluster lkc-m2kpj2
 
-source ./.env.sh; bin/wayang-submit org.apache.wayang.applications.WordCount 
java file://$(pwd)/wayang-applications/data/case-study/DATA_REPO_001/README.md
+confluent kafka topic list --cluster lkc-m2kpj2
\ No newline at end of file
diff --git a/wayang-applications/bin/run_wordcount.sh 
b/wayang-applications/bin/run_wordcount.sh
index 87b026a9..9f1a26e8 100755
--- a/wayang-applications/bin/run_wordcount.sh
+++ b/wayang-applications/bin/run_wordcount.sh
@@ -21,7 +21,7 @@ export JAVA_HOME=/Users/kamir/.sdkman/candidates/java/current
 cd ..
 cd ..
 
-#mvn clean compile package install -pl :wayang-assembly -Pdistribution 
-DskipTests
+mvn clean compile package install -pl :wayang-assembly -Pdistribution 
-DskipTests
 
 cd wayang-applications
 mvn compile package install -DskipTests
diff --git a/wayang-applications/bin/run_wordcount_kafka.sh 
b/wayang-applications/bin/run_wordcount_kafka.sh
index 87b026a9..89b2ff1b 100755
--- a/wayang-applications/bin/run_wordcount_kafka.sh
+++ b/wayang-applications/bin/run_wordcount_kafka.sh
@@ -28,4 +28,4 @@ mvn compile package install -DskipTests
 
 cd ..
 
-source ./.env.sh; bin/wayang-submit org.apache.wayang.applications.WordCount 
java file://$(pwd)/wayang-applications/data/case-study/DATA_REPO_001/README.md
+source ./.env.sh; bin/wayang-submit 
org.apache.wayang.applications.WordCountOnKafkaTopic
\ No newline at end of file
diff --git a/wayang-applications/pom.xml b/wayang-applications/pom.xml
index 756b6b20..3dab581e 100644
--- a/wayang-applications/pom.xml
+++ b/wayang-applications/pom.xml
@@ -83,8 +83,8 @@
             <groupId>org.apache.wayang</groupId>
             <artifactId>wayang-api-scala-java_2.12</artifactId>
             <version>${WAYANG_VERSION}</version>
-            <!--scope>system</scope-->
-            
<!--systemPath>/Users/mkaempf/GITHUB.private/kamir-incubator-wayang/wayang-assembly/target/apache-wayang-assembly-0.7.1-SNAPSHOT-incubating-dist/wayang-0.7.1-SNAPSHOT/jars/wayang-api-scala-java_2.12-0.7.1-SNAPSHOT.jar</systemPath-->
+            <scope>system</scope>
+            
<systemPath>/Users/kamir/GITHUB.merge/incubator-wayang/wayang-assembly/target/apache-wayang-assembly-0.7.1-incubating-dist/wayang-0.7.1/jars/wayang-api-scala-java-0.7.1.jar</systemPath>
         </dependency>
         <!--dependency>
             <groupId>org.apache.wayang</groupId>
diff --git 
a/wayang-applications/src/main/java/org/apache/wayang/applications/WordCountOnKafkaTopic.java
 
b/wayang-applications/src/main/java/org/apache/wayang/applications/WordCountOnKafkaTopic.java
new file mode 100644
index 00000000..03d3096f
--- /dev/null
+++ 
b/wayang-applications/src/main/java/org/apache/wayang/applications/WordCountOnKafkaTopic.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.applications;
+
+import org.apache.wayang.api.JavaPlanBuilder;
+import org.apache.wayang.basic.data.Tuple2;
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.api.WayangContext;
+import org.apache.wayang.core.function.FunctionDescriptor;
+import 
org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator;
+import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator;
+import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators;
+import org.apache.wayang.java.Java;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+// Import the Logger class
+import org.apache.log4j.Logger;
+
+
+public class WordCountOnKafkaTopic {
+
+
+    // Create a logger instance
+    private static final Logger logger = 
Logger.getLogger(WordCountOnKafkaTopic.class);
+
+    // Define the lambda function for formatting the output
+    private static final 
FunctionDescriptor.SerializableFunction<Tuple2<String, Integer>, String> udf = 
tuple -> {
+        return tuple.getField0() + ": " + tuple.getField1();
+    };
+
+    public static void main(String[] args){
+
+        System.out.println( ">>> Apache Wayang Test #02");
+        System.out.println( "    Process data from a Kafka topic using a 'Java 
Context'.");
+
+        // Default topic name
+        String input_topicName = "banking-tx-small-csv";
+        String output_topicName = 
"word_count_contribution___banking-tx-small-csv";
+
+        System.out.println( "    Topic: " + input_topicName );
+
+        // Check if at least one argument is provided
+        if (args.length > 0) {
+            // Assuming the first argument is the topic name
+            input_topicName = args[0];
+
+            int i = 0;
+            for (String arg : args) {
+                String line = String.format( "  %d    - %s", i,arg);
+                System.out.println(line);
+                i=i+1;
+            }
+
+        }
+        else {
+            System.out.println( "*** Use default topic name: " + 
input_topicName );
+        }
+
+        Configuration configuration = new Configuration();
+        // Get a plan builder.
+        WayangContext wayangContext = new WayangContext(configuration)
+                .withPlugin(Java.basicPlugin());
+        //        .withPlugin(Spark.basicPlugin());
+        JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
+                .withJobName(String.format("WordCount using Java Context on 
Kafka topic (%s)", input_topicName))
+                .withUdfJarOf(WordCountOnKafkaTopic.class);
+
+        // Start building the WayangPlan.
+        //Collection<Tuple2<String, Integer>> wordcounts_collection =
+                planBuilder
+                // Read the text file.
+                .readKafkaTopic(input_topicName).withName("Load data from 
topic")
+
+                // Split each line by non-word characters.
+                .flatMap(line -> Arrays.asList(line.split("\\W+")))
+                .withSelectivity(10, 100, 0.9)
+                .withName("Split words")
+
+                // Filter empty tokens.
+                .filter(token -> !token.isEmpty())
+                .withSelectivity(0.99, 0.99, 0.99)
+                .withName("Filter empty words")
+
+                // Attach counter to each word.
+                .map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To 
lower case, add counter")
+
+                // Sum up counters for every word.
+                .reduceByKey(
+                        Tuple2::getField0,
+                        (t1, t2) -> new Tuple2<>(t1.getField0(), 
t1.getField1() + t2.getField1())
+                )
+                .withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 
1, false, in -> Math.round(0.01 * in[0])))
+                .withName("Add counters")
+
+                // Execute the plan and collect the results.
+                //.collect();
+
+                .writeKafkaTopic(output_topicName, d -> String.format("%d, 
%s", d.getField1(), d.getField0()), "job_test_1",
+                        
LoadProfileEstimators.createFromSpecification("wayang.java.kafkatopicsink.load",
 configuration));
+
+        //System.out.println( wordcounts_collection );
+        System.out.println( "### Done. ***" );
+
+
+    }
+
+
+}
\ No newline at end of file
diff --git 
a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/KafkaTopicSink.java
 
b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/KafkaTopicSink.java
index 2df9ba36..beff5b2e 100644
--- 
a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/KafkaTopicSink.java
+++ 
b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/KafkaTopicSink.java
@@ -139,6 +139,7 @@ public class KafkaTopicSink<T> extends UnarySink<T> 
implements Serializable {
         if ( props == null ) {
             props = getDefaultProperties();
             System.out.println(">>> Create producer from DEFAULT PROPERTIES.");
+            props.list( System.out );
         }
         else {
             System.out.println(">>> Create producer from PROPERTIES: " + 
props);
diff --git 
a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkKafkaTopicSource.java
 
b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkKafkaTopicSource.java
index a3831b32..ef9e7b8d 100644
--- 
a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkKafkaTopicSource.java
+++ 
b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkKafkaTopicSource.java
@@ -70,7 +70,7 @@ public class SparkKafkaTopicSource extends KafkaTopicSource 
implements SparkExec
         System.out.println("### 7 ... ");
         this.initConsumer( (KafkaTopicSource) this );
 
-        ConsumerRecords<String, String> records = 
this.getConsumer().poll(Duration.ofMillis(15000));
+        ConsumerRecords<String, String> records = 
this.getConsumer().poll(Duration.ofMillis(100));
 
         List<String> collectedRecords = new ArrayList<>();
         for (ConsumerRecord<String, String> record : records) {

Reply via email to