This is an automated email from the ASF dual-hosted git repository.

kamir pushed a commit to branch kamir-patch-2
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git


The following commit(s) were added to refs/heads/kamir-patch-2 by this push:
     new a7afeee4 renamed examples to applications
a7afeee4 is described below

commit a7afeee4299e0f80149c1b101cc929cd2aeb4c3c
Author: Mirko Kämpf <[email protected]>
AuthorDate: Tue Aug 27 14:58:57 2024 +0200

    renamed examples to applications
---
 README.md                                          |   2 +-
 bin/wayang-submit                                  |   7 +-
 guides/develop-with-Wayang.md                      |   8 +-
 guides/ml-in-Wayang.md                             |   2 +-
 guides/tutorial.md                                 |   4 +-
 guides/wayang-examples.md                          |   8 +-
 pom.xml                                            |   7 +-
 wayang-api/wayang-api-scala-java/README.md         |   6 +-
 wayang-applications/bin/run_wordcount.sh           |  31 +++++
 wayang-applications/bin/run_wordcount_kafka.sh     |  31 +++++
 .../data/case-study/DATA_REPO_001/README.md        |   3 +
 .../case-study/My Journey to Apache Wayang.docx    | Bin 0 -> 14729 bytes
 wayang-applications/data/case-study/README.md      |   3 +
 wayang-applications/pom.xml                        | 145 +++++++++++++++++++++
 .../java/org/apache/wayang/applications/App.java   |  28 ++++
 .../wayang/applications/OutputSerializer.java      |  37 ++++++
 .../java/org/apache/wayang/applications/Util.java  |  27 ++++
 .../org/apache/wayang/applications/WordCount.java  |  87 +++++++++++++
 wayang-assembly/dependency-reduced-pom.xml         |   2 +-
 wayang-docs/src/main/resources/index.md            |   6 +-
 .../wayang/java/operators/JavaKafkaTopicSink.java  |  23 ++--
 ...leSinkTest.java => JavaKafkaTopicSinkTest.java} | 106 ++++++++++-----
 .../java/operators/JavaKafkaTopicSourceTest.java   |   7 +-
 .../java/operators/JavaTextFileSinkTest.java       |   9 +-
 24 files changed, 522 insertions(+), 67 deletions(-)

diff --git a/README.md b/README.md
index b4a0130d..46d893c1 100644
--- a/README.md
+++ b/README.md
@@ -48,7 +48,7 @@ Apache Wayang (incubating) can be used via the following APIs:
 
 ## Quick Guide for Running Wayang
 
-For a quick guide on how to run WordCount see [here](guides/tutorial.md).
+For a quick guide on how to run org.apache.wayang.examples.WordCount see 
[here](guides/tutorial.md).
 
 ## Quick Guide for Developing with Wayang
 
diff --git a/bin/wayang-submit b/bin/wayang-submit
index 28ee041f..d9658c33 100755
--- a/bin/wayang-submit
+++ b/bin/wayang-submit
@@ -18,7 +18,6 @@
 
 CLASS=$1
 
-
 if [ -z "${CLASS}" ]; then
     echo "Target Class for execution was not provided"
     exit 1
@@ -120,5 +119,11 @@ do
   ARGS="$ARGS \"${arg}\""
 done
 
+WAYANG_CLASSPATH="${WAYANG_CLASSPATH}:${WAYANG_APP_HOME}"
+
+echo $WAYANG_CLASSPATH
+echo
+echo "[EXECUTE] :: $RUNNER $FLAGS -cp "${WAYANG_CLASSPATH}" $CLASS ${ARGS}"
+echo
 eval "$RUNNER $FLAGS -cp "${WAYANG_CLASSPATH}" $CLASS ${ARGS}"
 
diff --git a/guides/develop-with-Wayang.md b/guides/develop-with-Wayang.md
index 430cf2db..ddd3a84b 100644
--- a/guides/develop-with-Wayang.md
+++ b/guides/develop-with-Wayang.md
@@ -71,7 +71,7 @@ This tutorial shows users how to import Wayang in their Java 
project using the m
 ```
 A sample pom file can be found [here](pom-example.xml).
 
-# Test WordCount
+# Test org.apache.wayang.examples.WordCount
 ## Create a Java class that contains the main method that runs the Wordcount
 Here is a sample implementation getting as input the filename (e.g., 
file:/Projects/Wayang/test.txt)
 
@@ -85,8 +85,8 @@ public static void main(String[] args) {
 
         /* Get a plan builder */
         JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
-                .withJobName("WordCount")
-                .withUdfJarOf(WordCount.class);
+                .withJobName("org.apache.wayang.examples.WordCount")
+                .withUdfJarOf(org.apache.wayang.examples.WordCount.class);
 
         /* Start building the Apache WayangPlan */
         Collection<Tuple2<String, Integer>> wordcounts = planBuilder
@@ -119,4 +119,4 @@ public static void main(String[] args) {
         System.out.println(wordcounts);
 }
 ```
-A sample Java class file can be found [here](WordCount.java).
+A sample Java class file can be found 
[here](org.apache.wayang.examples.WordCount.java).
diff --git a/guides/ml-in-Wayang.md b/guides/ml-in-Wayang.md
index dc693337..bd0392b5 100644
--- a/guides/ml-in-Wayang.md
+++ b/guides/ml-in-Wayang.md
@@ -28,7 +28,7 @@ public class CustomEstimatableCost implements EstimatableCost 
{
      * by implementing the interface in this class.
      */
 }
-public class WordCount {
+public class org.apache.wayang.examples.WordCount {
     public static void main(String[] args) {
         /* Create a Wayang context and specify the platforms Wayang will 
consider */
         Configuration config = new Configuration();
diff --git a/guides/tutorial.md b/guides/tutorial.md
index f3fdca60..1f196cbd 100644
--- a/guides/tutorial.md
+++ b/guides/tutorial.md
@@ -16,7 +16,7 @@
   limitations under the License.
 
 -->
-This tutorial will show users how to run the WordCount example locally with 
Wayang.
+This tutorial will show users how to run the 
org.apache.wayang.examples.WordCount example locally with Wayang.
 
 # Clone repository
 ```shell
@@ -57,7 +57,7 @@ source ~/.zshrc
 
 # Run the program
 
-To execute the WordCount example with Apache Wayang, you need to execute your 
program with the 'wayang-submit' command:
+To execute the org.apache.wayang.examples.WordCount example with Apache 
Wayang, you need to execute your program with the 'wayang-submit' command:
 
 ```shell
 cd wayang-0.7.1-SNAPSHOT
diff --git a/guides/wayang-examples.md b/guides/wayang-examples.md
index 2717dde0..cd464139 100644
--- a/guides/wayang-examples.md
+++ b/guides/wayang-examples.md
@@ -17,13 +17,13 @@
 
 -->
 This page contains examples to be executed using Wayang.
-- [WordCount](#wordcount)
+- [org.apache.wayang.examples.WordCount](#wordcount)
   * [Java scala-like API](#java-scala-like-api)
   * [Scala API](#scala-api)
 - [k-means](#k-means)
   * [Scala API](#scala-api-1)
 
-## WordCount
+## org.apache.wayang.examples.WordCount
 
 The "Hello World!" of data processing systems is the wordcount.
 
@@ -51,7 +51,7 @@ public class WordcountJava {
                 .withPlugin(Java.basicPlugin())
                 .withPlugin(Spark.basicPlugin());
         JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
-                .withJobName(String.format("WordCount (%s)", inputUrl))
+                
.withJobName(String.format("org.apache.wayang.examples.WordCount (%s)", 
inputUrl))
                 .withUdfJarOf(WordcountJava.class);
 
         // Start building the WayangPlan.
@@ -107,7 +107,7 @@ object WordcountScala {
       .withPlugin(Java.basicPlugin)
       .withPlugin(Spark.basicPlugin)
     val planBuilder = new PlanBuilder(wayangContext)
-      .withJobName(s"WordCount ($inputUrl)")
+      .withJobName(s"org.apache.wayang.examples.WordCount ($inputUrl)")
       .withUdfJarsOf(this.getClass)
 
     val wordcounts = planBuilder
diff --git a/pom.xml b/pom.xml
index 0aa63c58..1a35f001 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1338,14 +1338,15 @@
     <modules>
         <module>wayang-commons</module>
         <module>wayang-platforms</module>
-        <module>wayang-tests-integration</module>
         <module>wayang-api</module>
         <module>wayang-profiler</module>
         <module>wayang-plugins</module>
         <module>wayang-resources</module>
-        <module>wayang-benchmark</module>
         <module>wayang-assembly</module>
         <module>wayang-ml4all</module>
-        <!-- <module>wayang-docs</module> -->
+        <module>wayang-applications</module>
+        <module>wayang-benchmark</module>
+        <module>wayang-tests-integration</module>
+        <!--module>wayang-docs</module-->
     </modules>
 </project>
diff --git a/wayang-api/wayang-api-scala-java/README.md 
b/wayang-api/wayang-api-scala-java/README.md
index 085741b6..21ae71d0 100644
--- a/wayang-api/wayang-api-scala-java/README.md
+++ b/wayang-api/wayang-api-scala-java/README.md
@@ -87,12 +87,12 @@ import org.apache.wayang.core.api.Configuration
 import org.apache.wayang.java.Java
 import org.apache.wayang.spark.Spark
 
-class WordCount {}
+class org.apache.wayang.examples.WordCount {}
 
-object WordCount {
+object org.apache.wayang.examples.WordCount {
 
   def main(args: Array[String]): Unit = {
-    println("WordCount")
+    println("org.apache.wayang.examples.WordCount")
     println("Scala version:")
     println(scala.util.Properties.versionString)
 
diff --git a/wayang-applications/bin/run_wordcount.sh 
b/wayang-applications/bin/run_wordcount.sh
new file mode 100755
index 00000000..87b026a9
--- /dev/null
+++ b/wayang-applications/bin/run_wordcount.sh
@@ -0,0 +1,31 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+export JAVA_HOME=/Users/kamir/.sdkman/candidates/java/current
+
+cd ..
+cd ..
+
+#mvn clean compile package install -pl :wayang-assembly -Pdistribution 
-DskipTests
+
+cd wayang-applications
+mvn compile package install -DskipTests
+
+cd ..
+
+source ./.env.sh; bin/wayang-submit org.apache.wayang.applications.WordCount 
java file://$(pwd)/wayang-applications/data/case-study/DATA_REPO_001/README.md
diff --git a/wayang-applications/bin/run_wordcount_kafka.sh 
b/wayang-applications/bin/run_wordcount_kafka.sh
new file mode 100755
index 00000000..87b026a9
--- /dev/null
+++ b/wayang-applications/bin/run_wordcount_kafka.sh
@@ -0,0 +1,31 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+export JAVA_HOME=/Users/kamir/.sdkman/candidates/java/current
+
+cd ..
+cd ..
+
+#mvn clean compile package install -pl :wayang-assembly -Pdistribution 
-DskipTests
+
+cd wayang-applications
+mvn compile package install -DskipTests
+
+cd ..
+
+source ./.env.sh; bin/wayang-submit org.apache.wayang.applications.WordCount 
java file://$(pwd)/wayang-applications/data/case-study/DATA_REPO_001/README.md
diff --git a/wayang-applications/data/case-study/DATA_REPO_001/README.md 
b/wayang-applications/data/case-study/DATA_REPO_001/README.md
new file mode 100644
index 00000000..79b51add
--- /dev/null
+++ b/wayang-applications/data/case-study/DATA_REPO_001/README.md
@@ -0,0 +1,3 @@
+# Blossom Sky Setup on the "central node"
+
+docker pull ghcr.io/databloom-ai/bde:main
diff --git a/wayang-applications/data/case-study/My Journey to Apache 
Wayang.docx b/wayang-applications/data/case-study/My Journey to Apache 
Wayang.docx
new file mode 100644
index 00000000..b6e8d779
Binary files /dev/null and b/wayang-applications/data/case-study/My Journey to 
Apache Wayang.docx differ
diff --git a/wayang-applications/data/case-study/README.md 
b/wayang-applications/data/case-study/README.md
new file mode 100644
index 00000000..79b51add
--- /dev/null
+++ b/wayang-applications/data/case-study/README.md
@@ -0,0 +1,3 @@
+# Blossom Sky Setup on the "central node"
+
+docker pull ghcr.io/databloom-ai/bde:main
diff --git a/wayang-applications/pom.xml b/wayang-applications/pom.xml
new file mode 100644
index 00000000..756b6b20
--- /dev/null
+++ b/wayang-applications/pom.xml
@@ -0,0 +1,145 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.wayang</groupId>
+        <artifactId>wayang</artifactId>
+        <version>0.7.1</version>
+    </parent>
+
+    <artifactId>wayang-applications</artifactId>
+    <packaging>jar</packaging>
+
+    <name>wayang-applications</name>
+    <url>http://maven.apache.org</url>
+
+
+    <properties>
+        
<DATA_REPO_001>file://${project.basedir}/wayang-applications/data/case-study/DATA_REPO_001</DATA_REPO_001>
+        
<DATA_REPO_002>https://kamir.solidcommunity.net/public/ecolytiq-sustainability-profile</DATA_REPO_002>
+        
<DIST_WAYANG_HOME>${project.basedir}/../wayang-assembly/target/apache-wayang-assembly-0.7.1-incubating-dist/wayang-0.7.1</DIST_WAYANG_HOME>
+        <WAYANG_VERSION>0.7.1</WAYANG_VERSION>
+
+        <spark.version>3.5.0</spark.version>
+        <maven.compiler.source>11</maven.compiler.source>
+        <maven.compiler.target>11</maven.compiler.target>
+
+        <!-- We also specify the file encoding of our source files, to avoid a 
warning -->
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+
+        <assertj.version>3.17.2</assertj.version>
+        <commons-io.version>2.5</commons-io.version>
+        <guava.version>19.0</guava.version>
+        <hamcrest.version>1.3</hamcrest.version>
+        <jackson.version>2.10.2</jackson.version>
+        <jacoco.version>0.8.5</jacoco.version>
+        <jodatime.version>2.10.6</jodatime.version>
+        <jsonpath.version>2.4.0</jsonpath.version>
+        <junit5.version>5.6.1</junit5.version>
+        <mockito.version>3.5.10</mockito.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-core</artifactId>
+            <version>${WAYANG_VERSION}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-basic</artifactId>
+            <version>${WAYANG_VERSION}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-java</artifactId>
+            <version>${WAYANG_VERSION}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-spark_2.12</artifactId>
+            <version>${WAYANG_VERSION}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-api-scala-java_2.12</artifactId>
+            <version>${WAYANG_VERSION}</version>
+            <!--scope>system</scope-->
+            
<!--systemPath>/Users/mkaempf/GITHUB.private/kamir-incubator-wayang/wayang-assembly/target/apache-wayang-assembly-0.7.1-SNAPSHOT-incubating-dist/wayang-0.7.1-SNAPSHOT/jars/wayang-api-scala-java_2.12-0.7.1-SNAPSHOT.jar</systemPath-->
+        </dependency>
+        <!--dependency>
+            <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-api-scala-java_2.12</artifactId>
+            <version>${WAYANG_VERSION}</version>
+        </dependency-->
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.12</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>2.12.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <version>1.7.13</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>3.4.0</version> <!-- Use the latest version available -->
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>2.15.4</version>
+        </dependency>
+
+        <!-- Test -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.13.2</version>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.3.1</version>
+                <!--configuration>
+                    
<outputDirectory>/opt/homebrew/opt/apache-spark/jars</outputDirectory>
+                </configuration-->
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/wayang-applications/src/main/java/org/apache/wayang/applications/App.java 
b/wayang-applications/src/main/java/org/apache/wayang/applications/App.java
new file mode 100644
index 00000000..4463b67e
--- /dev/null
+++ b/wayang-applications/src/main/java/org/apache/wayang/applications/App.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.applications;
+
+public class App 
+{
+    public static void main( String[] args )
+    {
+        System.out.println( "Hello Apache Wayang friends!" );
+    }
+
+}
diff --git 
a/wayang-applications/src/main/java/org/apache/wayang/applications/OutputSerializer.java
 
b/wayang-applications/src/main/java/org/apache/wayang/applications/OutputSerializer.java
new file mode 100644
index 00000000..679a01c2
--- /dev/null
+++ 
b/wayang-applications/src/main/java/org/apache/wayang/applications/OutputSerializer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.applications;
+
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction;
+import org.apache.wayang.basic.data.Tuple2;
+
+public class OutputSerializer implements SerializableFunction<Tuple2<String, 
Integer>, String> {
+
+    @Override
+    public String apply(Tuple2<String, Integer> tuple) {
+        return tuple.getField0() + ": " + tuple.getField1();
+    }
+
+    // Example usage within a main method or similar test environment
+    public static void main(String[] args) {
+        OutputSerializer formatter = new OutputSerializer();
+        Tuple2<String, Integer> exampleTuple = new Tuple2<>("Age", 30);
+        String result = formatter.apply(exampleTuple);
+        System.out.println(result); // Output: Age: 30
+    }
+}
diff --git 
a/wayang-applications/src/main/java/org/apache/wayang/applications/Util.java 
b/wayang-applications/src/main/java/org/apache/wayang/applications/Util.java
new file mode 100644
index 00000000..747feb78
--- /dev/null
+++ b/wayang-applications/src/main/java/org/apache/wayang/applications/Util.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.applications;
+
+import java.io.Serializable;
+
+public class Util implements Serializable {
+    public static String formatData( String f1, Integer f2 ) {
+        return String.format("%d, %s", f1, f2);
+    }
+}
diff --git 
a/wayang-applications/src/main/java/org/apache/wayang/applications/WordCount.java
 
b/wayang-applications/src/main/java/org/apache/wayang/applications/WordCount.java
new file mode 100644
index 00000000..abd3d332
--- /dev/null
+++ 
b/wayang-applications/src/main/java/org/apache/wayang/applications/WordCount.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.applications;
+
+import org.apache.wayang.api.JavaPlanBuilder;
+import org.apache.wayang.basic.data.Tuple2;
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.api.WayangContext;
+import 
org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator;
+import org.apache.wayang.java.Java;
+import org.apache.wayang.spark.Spark;
+import java.util.Collection;
+import java.util.Arrays;
+
+public class WordCount {
+
+    public static void main(String[] args){
+
+        System.out.println( ">>> Apache Wayang Test #01");
+        System.out.println( "    We use a local file and a 'Java Context'.");
+        int i = 0;
+        for (String arg : args) {
+            String line = String.format( "  %d    - %s", i,arg);
+            System.out.println(line);
+            i=i+1;
+        }
+
+        // Settings
+        String inputUrl = args[1];
+
+        // Get a plan builder.
+        WayangContext wayangContext = new WayangContext(new Configuration())
+                .withPlugin(Java.basicPlugin());
+        //        .withPlugin(Spark.basicPlugin());
+        JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
+                
.withJobName(String.format("org.apache.wayang.examples.WordCount (%s)", 
inputUrl))
+                .withUdfJarOf(WordCount.class);
+
+        // Start building the WayangPlan.
+        Collection<Tuple2<String, Integer>> wordcounts = planBuilder
+                // Read the text file.
+                .readTextFile(inputUrl).withName("Load file")
+
+                // Split each line by non-word characters.
+                .flatMap(line -> Arrays.asList(line.split("\\W+")))
+                .withSelectivity(10, 100, 0.9)
+                .withName("Split words")
+
+                // Filter empty tokens.
+                .filter(token -> !token.isEmpty())
+                .withSelectivity(0.99, 0.99, 0.99)
+                .withName("Filter empty words")
+
+                // Attach counter to each word.
+                .map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To 
lower case, add counter")
+
+                // Sum up counters for every word.
+                .reduceByKey(
+                        Tuple2::getField0,
+                        (t1, t2) -> new Tuple2<>(t1.getField0(), 
t1.getField1() + t2.getField1())
+                )
+                .withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 
1, false, in -> Math.round(0.01 * in[0])))
+                .withName("Add counters")
+
+                // Execute the plan and collect the results.
+                .collect();
+
+        System.out.println(wordcounts);
+        System.out.println( "*** Done. ***" );
+    }
+}
\ No newline at end of file
diff --git a/wayang-assembly/dependency-reduced-pom.xml 
b/wayang-assembly/dependency-reduced-pom.xml
index 7713b09f..70d4e89b 100644
--- a/wayang-assembly/dependency-reduced-pom.xml
+++ b/wayang-assembly/dependency-reduced-pom.xml
@@ -1,4 +1,3 @@
-<?xml version="1.0" encoding="UTF-8"?>
 <!--
   Licensed to the Apache Software Foundation (ASF) under one
   or more contributor license agreements.  See the NOTICE file
@@ -17,6 +16,7 @@
   specific language governing permissions and limitations
   under the License.
   -->
+<?xml version="1.0" encoding="UTF-8"?>
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
   <parent>
     <artifactId>wayang</artifactId>
diff --git a/wayang-docs/src/main/resources/index.md 
b/wayang-docs/src/main/resources/index.md
index 14c6834f..3252a7d6 100644
--- a/wayang-docs/src/main/resources/index.md
+++ b/wayang-docs/src/main/resources/index.md
@@ -172,7 +172,7 @@ This tool will attempt to determine suitable values for the 
question marks (`?`)
 
 For some executable examples, have a look at [this 
repository](https://github.com/sekruse/rheem-examples).
 
-### WordCount
+### org.apache.wayang.examples.WordCount
 
 #### Java API
 ```java
@@ -198,7 +198,7 @@ public class WordcountJava {
                 .withPlugin(Java.basicPlugin())
                 .withPlugin(Spark.basicPlugin());
         JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
-                .withJobName(String.format("WordCount (%s)", inputUrl))
+                
.withJobName(String.format("org.apache.wayang.examples.WordCount (%s)", 
inputUrl))
                 .withUdfJarOf(WordcountJava.class);
 
         // Start building the WayangPlan.
@@ -254,7 +254,7 @@ object WordcountScala {
       .withPlugin(Java.basicPlugin)
       .withPlugin(Spark.basicPlugin)
     val planBuilder = new PlanBuilder(wayangContext)
-      .withJobName(s"WordCount ($inputUrl)")
+      .withJobName(s"org.apache.wayang.examples.WordCount ($inputUrl)")
       .withUdfJarsOf(this.getClass)
 
     val wordcounts = planBuilder
diff --git 
a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaKafkaTopicSink.java
 
b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaKafkaTopicSink.java
index 23535307..8ca09676 100644
--- 
a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaKafkaTopicSink.java
+++ 
b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaKafkaTopicSink.java
@@ -48,19 +48,24 @@ import java.util.List;
 import java.util.Optional;
 import java.util.function.Function;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Implementation fo the {@link KafkaTopicSink} for the {@link JavaPlatform}.
  */
 public class JavaKafkaTopicSink<T> extends KafkaTopicSink<T> implements 
JavaExecutionOperator {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(JavaKafkaTopicSink.class);
+
     public JavaKafkaTopicSink(String topicName, TransformationDescriptor<T, 
String> formattingDescriptor) {
         super(topicName, formattingDescriptor);
-        System.out.println("---> CREATE JavaKafkaTopicSink ... (2)");
+        logger.info("---> CREATE JavaKafkaTopicSink ... (Option 2)");
     }
 
     public JavaKafkaTopicSink(KafkaTopicSink<T> that) {
         super(that);
-        System.out.println("---> CREATE JavaKafkaTopicSink ... (1)");
+        logger.info("---> CREATE JavaKafkaTopicSink ... (Option 1)");
     }
 
     @Override
@@ -72,19 +77,17 @@ public class JavaKafkaTopicSink<T> extends 
KafkaTopicSink<T> implements JavaExec
         assert inputs.length == 1;
         assert outputs.length == 0;
 
-        System.out.println("---> WRITE TO KAFKA SINK...");
+        logger.info("---> WRITE TO KAFKA SINK...");
 
-        System.out.println("### 9 ... ");
+        logger.info("### 9 ... ");
 
         JavaChannelInstance input = (JavaChannelInstance) inputs[0];
 
         initProducer( (KafkaTopicSink<T>) this );
 
-        // File f = new File( "./" + this.topicName + ".txt" );
-
         final Function<T, String> formatter = 
javaExecutor.getCompiler().compile(this.formattingDescriptor);
 
-        System.out.println("### 10 ... ");
+        logger.info("### 10 ... ");
 
         try ( KafkaProducer<String,String> producer = getProducer() ) {
             input.<T>provideStream().forEach(
@@ -103,10 +106,10 @@ public class JavaKafkaTopicSink<T> extends 
KafkaTopicSink<T> implements JavaExec
                             producer.send(record, (metadata, exception) -> {
                                 if (exception != null) {
                                     // Handle any exceptions thrown during send
-                                    System.err.println("Failed to send 
message: " + exception.getMessage());
+                                    logger.error("Failed to send message: " + 
exception.getMessage());
                                 } else {
                                     // Optionally handle successful send, log 
metadata, etc.
-                                    System.out.println("Message sent 
successfully to " + metadata.topic() + " partition " + metadata.partition());
+                                    logger.info("Message sent successfully to 
" + metadata.topic() + " partition " + metadata.partition());
                                 }
                             });
                         } catch (Exception ex) {
@@ -118,7 +121,7 @@ public class JavaKafkaTopicSink<T> extends 
KafkaTopicSink<T> implements JavaExec
             throw new WayangException("Writing to Kafka topic failed.", e);
         }
 
-        System.out.println("### 11 ... ");
+        logger.info("### 11 ... ");
 
         return ExecutionOperator.modelEagerExecution(inputs, outputs, 
operatorContext);
     }
diff --git 
a/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSinkTest.java
 
b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaKafkaTopicSinkTest.java
similarity index 51%
copy from 
wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSinkTest.java
copy to 
wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaKafkaTopicSinkTest.java
index 502d9ef6..331c63d0 100644
--- 
a/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSinkTest.java
+++ 
b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaKafkaTopicSinkTest.java
@@ -33,6 +33,9 @@ import org.apache.wayang.java.channels.StreamChannel;
 import org.apache.wayang.java.execution.JavaExecutor;
 import org.apache.wayang.java.platform.JavaPlatform;
 
+import org.apache.wayang.basic.operators.KafkaTopicSource;
+
+
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
@@ -46,14 +49,20 @@ import java.util.List;
 import java.util.Locale;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import java.util.Properties;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
- * Test suite for {@link JavaTextFileSink}.
+ * Test suite for {@link JavaKafkaTopicSink}.
  */
-public class JavaTextFileSinkTest extends JavaExecutionOperatorTestBase {
+public class JavaKafkaTopicSinkTest extends JavaExecutionOperatorTestBase {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(JavaTextFileSinkTest.class);
 
     private Locale defaultLocale;
 
@@ -73,37 +82,76 @@ public class JavaTextFileSinkTest extends 
JavaExecutionOperatorTestBase {
         Locale.setDefault(defaultLocale);
     }
 
+
+
     @Test
-    public void testWritingLocalFile() throws IOException, URISyntaxException {
+    public void testWritingToKafkaTopic() throws Exception {
+
         Configuration configuration = new Configuration();
 
-        final File tempDir = LocalFileSystem.findTempDir();
-        final String targetUrl = LocalFileSystem.toURL(new File(tempDir, 
"testWritingLocalFile.txt"));
-        JavaTextFileSink<Float> sink = new JavaTextFileSink<>(
-                targetUrl,
-                new TransformationDescriptor<>(
-                        f -> String.format("%.2f", f),
-                        Float.class, String.class
-                )
-        );
-
-        Job job = mock(Job.class);
-        when(job.getConfiguration()).thenReturn(configuration);
-        final JavaExecutor javaExecutor = (JavaExecutor) 
JavaPlatform.getInstance().createExecutor(job);
-
-        StreamChannel.Instance inputChannelInstance = (StreamChannel.Instance) 
StreamChannel.DESCRIPTOR
-                .createChannel(mock(OutputSlot.class), configuration)
-                .createInstance(javaExecutor, 
mock(OptimizationContext.OperatorContext.class), 0);
-        inputChannelInstance.accept(Stream.of(1.123f, -0.1f, 3f));
-        evaluate(sink, new ChannelInstance[]{inputChannelInstance}, new 
ChannelInstance[0]);
-
-
-        final List<String> lines = Files.lines(Paths.get(new 
URI(targetUrl))).collect(Collectors.toList());
-        Assert.assertEquals(
-                Arrays.asList("1.12", "-0.10", "3.00"),
-                lines
-        );
+        // We assume, that we write back into the same cluster, to avoid 
"external copies"...
+        Properties props = KafkaTopicSource.getDefaultProperties();
+
+        logger.info(">>> Test: testWriteIntoKafkaTopic()");
+
+        final String topicName1 = "banking-tx-small-csv";
+
+        logger.info("> 0 ... ");
+
+        logger.info( "*** [TOPIC-Name] " + topicName1 + " ***");
+
+        logger.info( ">   Write to topic ... ");
+
+        logger.info("> 1 ... ");
+
+        props.list(System.out);
+
+        logger.info("> 2 ... ");
+        
+        JavaExecutor javaExecutor = null;
+        
+        try {
+
+            JavaKafkaTopicSink<Float> sink = new JavaKafkaTopicSink<>(
+                    topicName1,
+                    new TransformationDescriptor<>(
+                            f -> String.format("%.2f", f),
+                            Float.class, String.class
+                    )
+            );
+
+            logger.info("> 3 ... ");
+            
+            Job job = mock(Job.class);
+            when(job.getConfiguration()).thenReturn(configuration);
+            javaExecutor = (JavaExecutor) 
JavaPlatform.getInstance().createExecutor(job);
+
+            StreamChannel.Instance inputChannelInstance = 
(StreamChannel.Instance) StreamChannel.DESCRIPTOR
+                    .createChannel(mock(OutputSlot.class), configuration)
+                    .createInstance(javaExecutor, 
mock(OptimizationContext.OperatorContext.class), 0);
+            inputChannelInstance.accept(Stream.of(1.123f, -0.1f, 3f));
+            evaluate(sink, new ChannelInstance[]{inputChannelInstance}, new 
ChannelInstance[0]);
+
+            logger.info("> 4 ... ");
+
+        }
+        catch (Exception ex ) {
+            
+            ex.printStackTrace();
+
+            logger.info("##5## ... ");
+
+            Assert.fail();
+        
+        }
+
+        Assert.assertTrue( true );
+
+        logger.info("> *6*");
+
 
     }
 
+
+
 }
diff --git 
a/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaKafkaTopicSourceTest.java
 
b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaKafkaTopicSourceTest.java
index 2264284b..c78436ca 100644
--- 
a/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaKafkaTopicSourceTest.java
+++ 
b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaKafkaTopicSourceTest.java
@@ -70,13 +70,13 @@ public class JavaKafkaTopicSourceTest extends 
JavaExecutionOperatorTestBase {
         Locale.setDefault(defaultLocale);
     }
 
-    // @Test
+    @Test
     public void testA() throws Exception {
         Assert.assertEquals(3, 3);
         logger.info(">>> Test A");
     }
 
-    // @Test
+    @Test
     public void testReadFromKafkaTopic() {
 
         logger.info(">>> Test: testReadFromKafkaTopic()");
@@ -91,8 +91,7 @@ public class JavaKafkaTopicSourceTest extends 
JavaExecutionOperatorTestBase {
 
         logger.info("> 1 ... ");
 
-        //Properties props = KafkaTopicSource.getDefaultProperties();
-        Properties props = new Properties();
+        Properties props = KafkaTopicSource.getDefaultProperties();
         
         logger.info("> 2 ... ");
 
diff --git 
a/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSinkTest.java
 
b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSinkTest.java
index 502d9ef6..d56d2416 100644
--- 
a/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSinkTest.java
+++ 
b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSinkTest.java
@@ -50,11 +50,16 @@ import java.util.stream.Stream;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Test suite for {@link JavaTextFileSink}.
  */
 public class JavaTextFileSinkTest extends JavaExecutionOperatorTestBase {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(JavaTextFileSinkTest.class);
+
     private Locale defaultLocale;
 
     /**
@@ -70,6 +75,7 @@ public class JavaTextFileSinkTest extends 
JavaExecutionOperatorTestBase {
 
     @After
     public void teardownTest() {
+
         Locale.setDefault(defaultLocale);
     }
 
@@ -78,7 +84,7 @@ public class JavaTextFileSinkTest extends 
JavaExecutionOperatorTestBase {
         Configuration configuration = new Configuration();
 
         final File tempDir = LocalFileSystem.findTempDir();
-        final String targetUrl = LocalFileSystem.toURL(new File(tempDir, 
"testWritingLocalFile.txt"));
+        final String targetUrl = LocalFileSystem.toURL(new File(tempDir, 
"testWritingLocalFile_2.txt"));
         JavaTextFileSink<Float> sink = new JavaTextFileSink<>(
                 targetUrl,
                 new TransformationDescriptor<>(
@@ -106,4 +112,5 @@ public class JavaTextFileSinkTest extends 
JavaExecutionOperatorTestBase {
 
     }
 
+
 }


Reply via email to