This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a3fbd2e  [Pulsar-Flink] Add Batch Sink Scala Examples Documentation 
(#3190)
a3fbd2e is described below

commit a3fbd2ecb8d072fd7f2321c01801dfdb33943e66
Author: Eren Avsarogullari <[email protected]>
AuthorDate: Sat Dec 15 01:45:41 2018 +0000

    [Pulsar-Flink] Add Batch Sink Scala Examples Documentation (#3190)
    
    ### Motivation
    This PR aims to add new Flink Batch Sink Scala Examples Documentation by 
explaining how to be used in Scala applications with different format.
    
    ### Modifications
    1- `README.md`
    2- Fix on `io-rabbitmq.md`
---
 examples/flink-consumer-source/pom.xml             |   2 +-
 .../example/FlinkPulsarBatchCsvSinkExample.java    |   0
 .../example/FlinkPulsarBatchJsonSinkExample.java   |   0
 .../example/FlinkPulsarBatchSinkExample.java       |   0
 .../batch/connectors/pulsar/example/README.md      |  21 ++
 .../example}/PulsarConsumerSourceWordCount.java    |   2 +-
 .../streaming/connectors/pulsar/example}/README.md |   0
 .../FlinkPulsarBatchCsvSinkScalaExample.scala      |   0
 .../FlinkPulsarBatchJsonSinkScalaExample.scala     |   0
 .../example/FlinkPulsarBatchSinkScalaExample.scala |   0
 .../batch/connectors/pulsar/example/README.md      | 283 +++++++++++++++++++++
 pulsar-flink/pom.xml                               |   6 -
 site2/docs/io-rabbitmq.md                          |   4 +-
 13 files changed, 308 insertions(+), 10 deletions(-)

diff --git a/examples/flink-consumer-source/pom.xml 
b/examples/flink-consumer-source/pom.xml
index 3bf7b6c..2f674fc 100644
--- a/examples/flink-consumer-source/pom.xml
+++ b/examples/flink-consumer-source/pom.xml
@@ -68,7 +68,7 @@
               <createDependencyReducedPom>false</createDependencyReducedPom>
               <transformers>
                 <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                  
<mainClass>org.apache.pulsar.examples.flink.PulsarConsumerSourceWordCount</mainClass>
+                  
<mainClass>org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount</mainClass>
                 </transformer>
               </transformers>
               <finalName>pulsar-flink-streaming-wordcount</finalName>
diff --git 
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
similarity index 100%
rename from 
pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
rename to 
examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
diff --git 
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
similarity index 100%
rename from 
pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
rename to 
examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
diff --git 
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
similarity index 100%
rename from 
pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
rename to 
examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
diff --git 
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
similarity index 92%
rename from 
pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
rename to 
examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
index 2ab6ec0..2f99e76 100644
--- 
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
+++ 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
@@ -1,3 +1,24 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
 The Flink Batch Sink for Pulsar is a custom sink that enables Apache 
[Flink](https://flink.apache.org/) to write 
[DataSet](https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/index.html)
 to Pulsar.
 
 # Prerequisites
diff --git 
a/examples/flink-consumer-source/src/main/java/org/apache/pulsar/examples/flink/PulsarConsumerSourceWordCount.java
 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java
similarity index 98%
rename from 
examples/flink-consumer-source/src/main/java/org/apache/pulsar/examples/flink/PulsarConsumerSourceWordCount.java
rename to 
examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java
index e163f60..547b635 100644
--- 
a/examples/flink-consumer-source/src/main/java/org/apache/pulsar/examples/flink/PulsarConsumerSourceWordCount.java
+++ 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.examples.flink;
+package org.apache.flink.streaming.connectors.pulsar.example;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
diff --git a/examples/flink-consumer-source/README.md 
b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
similarity index 100%
rename from examples/flink-consumer-source/README.md
rename to 
examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
diff --git 
a/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
 
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
similarity index 100%
rename from 
pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
rename to 
examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
diff --git 
a/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
 
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
similarity index 100%
rename from 
pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
rename to 
examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
diff --git 
a/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
 
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
similarity index 100%
rename from 
pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
rename to 
examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
diff --git 
a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/README.md
 
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/README.md
new file mode 100644
index 0000000..b3f8cb5
--- /dev/null
+++ 
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/README.md
@@ -0,0 +1,283 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
+The Flink Batch Sink for Pulsar is a custom sink that enables Apache 
[Flink](https://flink.apache.org/) to write 
[DataSet](https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/index.html)
 to Pulsar.
+This document explains how to develop Scala Applications by using Flink Batch 
Sink.
+# Prerequisites
+
+To use this sink, include a dependency for the `pulsar-flink` library in your 
Java configuration.
+
+# Maven
+
+If you're using Maven, add this to your `pom.xml`:
+
+```xml
+<!-- in your <properties> block -->
+<pulsar.version>{{pulsar:version}}</pulsar.version>
+
+<!-- in your <dependencies> block -->
+<dependency>
+  <groupId>org.apache.pulsar</groupId>
+  <artifactId>pulsar-flink</artifactId>
+  <version>${pulsar.version}</version>
+</dependency>
+```
+
+# Gradle
+
+If you're using Gradle, add this to your `build.gradle` file:
+
+```groovy
+def pulsarVersion = "{{pulsar:version}}"
+
+dependencies {
+    compile group: 'org.apache.pulsar', name: 'pulsar-flink', version: 
pulsarVersion
+}
+```
+
+# PulsarOutputFormat
+### Usage
+
+Please find Scala sample usage of `PulsarOutputFormat` as follows:
+
+```scala
+      /**
+        * Data type for words with count.
+        */
+      case class WordWithCount(word: String, count: Long) {
+        override def toString: String = "WordWithCount { word = " + word + ", 
count = " + count + " }"
+      }
+
+      /**
+        * Implementation
+        */
+      private val EINSTEIN_QUOTE = "Imagination is more important than 
knowledge. " +
+        "Knowledge is limited. Imagination encircles the world."
+      private val SERVICE_URL = "pulsar://127.0.0.1:6650"
+      private val TOPIC_NAME = "my-flink-topic"
+
+      def main(args: Array[String]): Unit = {
+
+        // set up the execution environment
+        val env = ExecutionEnvironment.getExecutionEnvironment
+
+        // create PulsarOutputFormat instance
+        val pulsarOutputFormat =
+          new PulsarOutputFormat[WordWithCount](SERVICE_URL, TOPIC_NAME, new 
SerializationSchema[WordWithCount] {
+            override def serialize(wordWithCount: WordWithCount): Array[Byte] 
= wordWithCount.toString.getBytes
+          })
+
+        // create DataSet
+        val textDS = env.fromElements[String](EINSTEIN_QUOTE)
+
+        // convert sentence to words
+        textDS.flatMap((value: String, out: Collector[WordWithCount]) => {
+          val words = value.toLowerCase.split(" ")
+          for (word <- words) {
+            out.collect(new WordWithCount(word.replace(".", ""), 1))
+          }
+        })
+
+        // filter words which length is bigger than 4
+        .filter((wordWithCount: WordWithCount) => wordWithCount.word.length > 
4)
+
+        // group the words
+        .groupBy((wordWithCount: WordWithCount) => wordWithCount.word)
+
+        // sum the word counts
+        .reduce((wordWithCount1: WordWithCount, wordWithCount2: WordWithCount) 
=>
+          new WordWithCount(wordWithCount1.word, wordWithCount1.count + 
wordWithCount2.count))
+
+        // write batch data to Pulsar
+        .output(pulsarOutputFormat)
+
+        // set parallelism to write Pulsar in parallel (optional)
+        env.setParallelism(2)
+
+        // execute program
+        env.execute("Flink - Pulsar Batch WordCount")
+      }
+```
+
+### Sample Output
+
+Please find sample output for above application as follows:
+```
+WordWithCount { word = encircles, count = 1 }
+WordWithCount { word = important, count = 1 }
+WordWithCount { word = imagination, count = 2 }
+WordWithCount { word = limited, count = 1 }
+WordWithCount { word = knowledge, count = 2 }
+WordWithCount { word = world, count = 1 }
+```
+
+### Complete Example
+
+You can find a complete example 
[here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala).
+In this example, Flink DataSet is processed as word-count and being written to 
Pulsar.
+
+
+# PulsarCsvOutputFormat
+### Usage
+
+Please find Scala sample usage of `PulsarCsvOutputFormat` as follows:
+
+```scala
+      /**
+        * NasaMission Model
+        */
+      private case class NasaMission(id: Int, missionName: String, startYear: 
Int, endYear: Int)
+        extends Tuple4(id, missionName, startYear, endYear)
+
+      /**
+        * Implementation
+        */
+      private val SERVICE_URL = "pulsar://127.0.0.1:6650"
+      private val TOPIC_NAME = "my-flink-topic"
+
+      private val nasaMissions = List(
+        NasaMission(1, "Mercury program", 1959, 1963),
+        NasaMission(2, "Apollo program", 1961, 1972),
+        NasaMission(3, "Gemini program", 1963, 1966),
+        NasaMission(4, "Skylab", 1973, 1974),
+        NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))
+
+      def main(args: Array[String]): Unit = {
+
+        // set up the execution environment
+        val env = ExecutionEnvironment.getExecutionEnvironment
+
+        // create PulsarCsvOutputFormat instance
+        val pulsarCsvOutputFormat =
+          new PulsarCsvOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
+
+        // create DataSet
+        val textDS = env.fromCollection(nasaMissions)
+
+        // map nasa mission names to upper-case
+        textDS.map(nasaMission => NasaMission(
+          nasaMission.id,
+          nasaMission.missionName.toUpperCase,
+          nasaMission.startYear,
+          nasaMission.endYear))
+
+        // filter missions which started after 1970
+        .filter(_.startYear > 1970)
+
+        // write batch data to Pulsar as Csv
+        .output(pulsarCsvOutputFormat)
+
+        // set parallelism to write Pulsar in parallel (optional)
+        env.setParallelism(2)
+
+        // execute program
+        env.execute("Flink - Pulsar Batch Csv")
+      }
+```
+
+### Sample Output
+
+Please find sample output for above application as follows:
+```
+4,SKYLAB,1973,1974
+5,APOLLO–SOYUZ TEST PROJECT,1975,1975
+```
+
+### Complete Example
+
+You can find a complete example 
[here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala).
+In this example, Flink DataSet is processed and written to Pulsar in Csv 
format.
+
+
+# PulsarJsonOutputFormat
+### Usage
+
+Please find Scala sample usage of `PulsarJsonOutputFormat` as follows:
+
+```scala
+      /**
+        * NasaMission Model
+        */
+      private case class NasaMission(@BeanProperty id: Int,
+                             @BeanProperty missionName: String,
+                             @BeanProperty startYear: Int,
+                             @BeanProperty endYear: Int)
+
+      /**
+        * Implementation
+        */
+      private val nasaMissions = List(
+        NasaMission(1, "Mercury program", 1959, 1963),
+        NasaMission(2, "Apollo program", 1961, 1972),
+        NasaMission(3, "Gemini program", 1963, 1966),
+        NasaMission(4, "Skylab", 1973, 1974),
+        NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))
+
+      private val SERVICE_URL = "pulsar://127.0.0.1:6650"
+      private val TOPIC_NAME = "my-flink-topic"
+
+      def main(args: Array[String]): Unit = {
+
+        // set up the execution environment
+        val env = ExecutionEnvironment.getExecutionEnvironment
+
+        // create PulsarJsonOutputFormat instance
+        val pulsarJsonOutputFormat = new 
PulsarJsonOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
+
+        // create DataSet
+        val nasaMissionDS = env.fromCollection(nasaMissions)
+
+        // map nasa mission names to upper-case
+        nasaMissionDS.map(nasaMission =>
+          NasaMission(
+            nasaMission.id,
+            nasaMission.missionName.toUpperCase,
+            nasaMission.startYear,
+            nasaMission.endYear))
+
+        // filter missions which started after 1970
+        .filter(_.startYear > 1970)
+
+        // write batch data to Pulsar
+        .output(pulsarJsonOutputFormat)
+
+        // set parallelism to write Pulsar in parallel (optional)
+        env.setParallelism(2)
+
+        // execute program
+        env.execute("Flink - Pulsar Batch Json")
+      }
+```
+
+**Note:** Property definitions of the model should cover `@BeanProperty` to be 
visible.
+
+### Sample Output
+
+Please find sample output for above application as follows:
+```
+{"id":4,"missionName":"SKYLAB","startYear":1973,"endYear":1974}
+{"id":5,"missionName":"APOLLO–SOYUZ TEST 
PROJECT","startYear":1975,"endYear":1975}
+```
+
+### Complete Example
+
+You can find a complete example 
[here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala).
+In this example, Flink DataSet is processed and written to Pulsar in Json 
format.
diff --git a/pulsar-flink/pom.xml b/pulsar-flink/pom.xml
index e48d213..b91af16 100644
--- a/pulsar-flink/pom.xml
+++ b/pulsar-flink/pom.xml
@@ -52,12 +52,6 @@
     </dependency>
 
     <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-scala_${scala.binary.version}</artifactId>
-      <version>${flink.version}</version>
-    </dependency>
-
-    <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-client</artifactId>
       <version>${project.version}</version>
diff --git a/site2/docs/io-rabbitmq.md b/site2/docs/io-rabbitmq.md
index d2cea15..0edaed1 100644
--- a/site2/docs/io-rabbitmq.md
+++ b/site2/docs/io-rabbitmq.md
@@ -1,12 +1,12 @@
 ---
 id: io-rabbitmq
 title: RabbitMQ Connector
-sidebar_label: RabittMQ Connector
+sidebar_label: RabbitMQ Connector
 ---
 
 ## Source
 
-The RabittMQ Source connector is used for receiving messages from a RabittMQ 
cluster and writing
+The RabbitMQ Source connector is used for receiving messages from a RabbitMQ 
cluster and writing
 messages to Pulsar topics.
 
 ### Source Configuration Options

Reply via email to