This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a3fbd2e [Pulsar-Flink] Add Batch Sink Scala Examples Documentation
(#3190)
a3fbd2e is described below
commit a3fbd2ecb8d072fd7f2321c01801dfdb33943e66
Author: Eren Avsarogullari <[email protected]>
AuthorDate: Sat Dec 15 01:45:41 2018 +0000
[Pulsar-Flink] Add Batch Sink Scala Examples Documentation (#3190)
### Motivation
This PR aims to add new Flink Batch Sink Scala Examples Documentation by
explaining how to be used in Scala applications with different format.
### Modifications
1- `README.md`
2- Fix on `io-rabbitmq.md`
---
examples/flink-consumer-source/pom.xml | 2 +-
.../example/FlinkPulsarBatchCsvSinkExample.java | 0
.../example/FlinkPulsarBatchJsonSinkExample.java | 0
.../example/FlinkPulsarBatchSinkExample.java | 0
.../batch/connectors/pulsar/example/README.md | 21 ++
.../example}/PulsarConsumerSourceWordCount.java | 2 +-
.../streaming/connectors/pulsar/example}/README.md | 0
.../FlinkPulsarBatchCsvSinkScalaExample.scala | 0
.../FlinkPulsarBatchJsonSinkScalaExample.scala | 0
.../example/FlinkPulsarBatchSinkScalaExample.scala | 0
.../batch/connectors/pulsar/example/README.md | 283 +++++++++++++++++++++
pulsar-flink/pom.xml | 6 -
site2/docs/io-rabbitmq.md | 4 +-
13 files changed, 308 insertions(+), 10 deletions(-)
diff --git a/examples/flink-consumer-source/pom.xml
b/examples/flink-consumer-source/pom.xml
index 3bf7b6c..2f674fc 100644
--- a/examples/flink-consumer-source/pom.xml
+++ b/examples/flink-consumer-source/pom.xml
@@ -68,7 +68,7 @@
<createDependencyReducedPom>false</createDependencyReducedPom>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-
<mainClass>org.apache.pulsar.examples.flink.PulsarConsumerSourceWordCount</mainClass>
+
<mainClass>org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount</mainClass>
</transformer>
</transformers>
<finalName>pulsar-flink-streaming-wordcount</finalName>
diff --git
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
similarity index 100%
rename from
pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
rename to
examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java
diff --git
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
similarity index 100%
rename from
pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
rename to
examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java
diff --git
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
similarity index 100%
rename from
pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
rename to
examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
diff --git
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
similarity index 92%
rename from
pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
rename to
examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
index 2ab6ec0..2f99e76 100644
---
a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
+++
b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
@@ -1,3 +1,24 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
The Flink Batch Sink for Pulsar is a custom sink that enables Apache
[Flink](https://flink.apache.org/) to write
[DataSet](https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/index.html)
to Pulsar.
# Prerequisites
diff --git
a/examples/flink-consumer-source/src/main/java/org/apache/pulsar/examples/flink/PulsarConsumerSourceWordCount.java
b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java
similarity index 98%
rename from
examples/flink-consumer-source/src/main/java/org/apache/pulsar/examples/flink/PulsarConsumerSourceWordCount.java
rename to
examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java
index e163f60..547b635 100644
---
a/examples/flink-consumer-source/src/main/java/org/apache/pulsar/examples/flink/PulsarConsumerSourceWordCount.java
+++
b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.examples.flink;
+package org.apache.flink.streaming.connectors.pulsar.example;
import static java.nio.charset.StandardCharsets.UTF_8;
diff --git a/examples/flink-consumer-source/README.md
b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
similarity index 100%
rename from examples/flink-consumer-source/README.md
rename to
examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
diff --git
a/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
similarity index 100%
rename from
pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
rename to
examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala
diff --git
a/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
similarity index 100%
rename from
pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
rename to
examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala
diff --git
a/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
similarity index 100%
rename from
pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
rename to
examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala
diff --git
a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/README.md
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/README.md
new file mode 100644
index 0000000..b3f8cb5
--- /dev/null
+++
b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/README.md
@@ -0,0 +1,283 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+The Flink Batch Sink for Pulsar is a custom sink that enables Apache
[Flink](https://flink.apache.org/) to write
[DataSet](https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/index.html)
to Pulsar.
+This document explains how to develop Scala Applications by using Flink Batch
Sink.
+# Prerequisites
+
+To use this sink, include a dependency for the `pulsar-flink` library in your
Java configuration.
+
+# Maven
+
+If you're using Maven, add this to your `pom.xml`:
+
+```xml
+<!-- in your <properties> block -->
+<pulsar.version>{{pulsar:version}}</pulsar.version>
+
+<!-- in your <dependencies> block -->
+<dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-flink</artifactId>
+ <version>${pulsar.version}</version>
+</dependency>
+```
+
+# Gradle
+
+If you're using Gradle, add this to your `build.gradle` file:
+
+```groovy
+def pulsarVersion = "{{pulsar:version}}"
+
+dependencies {
+ compile group: 'org.apache.pulsar', name: 'pulsar-flink', version:
pulsarVersion
+}
+```
+
+# PulsarOutputFormat
+### Usage
+
+Please find Scala sample usage of `PulsarOutputFormat` as follows:
+
+```scala
+ /**
+ * Data type for words with count.
+ */
+ case class WordWithCount(word: String, count: Long) {
+ override def toString: String = "WordWithCount { word = " + word + ",
count = " + count + " }"
+ }
+
+ /**
+ * Implementation
+ */
+ private val EINSTEIN_QUOTE = "Imagination is more important than
knowledge. " +
+ "Knowledge is limited. Imagination encircles the world."
+ private val SERVICE_URL = "pulsar://127.0.0.1:6650"
+ private val TOPIC_NAME = "my-flink-topic"
+
+ def main(args: Array[String]): Unit = {
+
+ // set up the execution environment
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ // create PulsarOutputFormat instance
+ val pulsarOutputFormat =
+ new PulsarOutputFormat[WordWithCount](SERVICE_URL, TOPIC_NAME, new
SerializationSchema[WordWithCount] {
+ override def serialize(wordWithCount: WordWithCount): Array[Byte]
= wordWithCount.toString.getBytes
+ })
+
+ // create DataSet
+ val textDS = env.fromElements[String](EINSTEIN_QUOTE)
+
+ // convert sentence to words
+ textDS.flatMap((value: String, out: Collector[WordWithCount]) => {
+ val words = value.toLowerCase.split(" ")
+ for (word <- words) {
+ out.collect(new WordWithCount(word.replace(".", ""), 1))
+ }
+ })
+
+ // filter words which length is bigger than 4
+ .filter((wordWithCount: WordWithCount) => wordWithCount.word.length >
4)
+
+ // group the words
+ .groupBy((wordWithCount: WordWithCount) => wordWithCount.word)
+
+ // sum the word counts
+ .reduce((wordWithCount1: WordWithCount, wordWithCount2: WordWithCount)
=>
+ new WordWithCount(wordWithCount1.word, wordWithCount1.count +
wordWithCount2.count))
+
+ // write batch data to Pulsar
+ .output(pulsarOutputFormat)
+
+ // set parallelism to write Pulsar in parallel (optional)
+ env.setParallelism(2)
+
+ // execute program
+ env.execute("Flink - Pulsar Batch WordCount")
+ }
+```
+
+### Sample Output
+
+Please find sample output for above application as follows:
+```
+WordWithCount { word = encircles, count = 1 }
+WordWithCount { word = important, count = 1 }
+WordWithCount { word = imagination, count = 2 }
+WordWithCount { word = limited, count = 1 }
+WordWithCount { word = knowledge, count = 2 }
+WordWithCount { word = world, count = 1 }
+```
+
+### Complete Example
+
+You can find a complete example
[here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala).
+In this example, Flink DataSet is processed as word-count and being written to
Pulsar.
+
+
+# PulsarCsvOutputFormat
+### Usage
+
+Please find Scala sample usage of `PulsarCsvOutputFormat` as follows:
+
+```scala
+ /**
+ * NasaMission Model
+ */
+ private case class NasaMission(id: Int, missionName: String, startYear:
Int, endYear: Int)
+ extends Tuple4(id, missionName, startYear, endYear)
+
+ /**
+ * Implementation
+ */
+ private val SERVICE_URL = "pulsar://127.0.0.1:6650"
+ private val TOPIC_NAME = "my-flink-topic"
+
+ private val nasaMissions = List(
+ NasaMission(1, "Mercury program", 1959, 1963),
+ NasaMission(2, "Apollo program", 1961, 1972),
+ NasaMission(3, "Gemini program", 1963, 1966),
+ NasaMission(4, "Skylab", 1973, 1974),
+ NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))
+
+ def main(args: Array[String]): Unit = {
+
+ // set up the execution environment
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ // create PulsarCsvOutputFormat instance
+ val pulsarCsvOutputFormat =
+ new PulsarCsvOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
+
+ // create DataSet
+ val textDS = env.fromCollection(nasaMissions)
+
+ // map nasa mission names to upper-case
+ textDS.map(nasaMission => NasaMission(
+ nasaMission.id,
+ nasaMission.missionName.toUpperCase,
+ nasaMission.startYear,
+ nasaMission.endYear))
+
+ // filter missions which started after 1970
+ .filter(_.startYear > 1970)
+
+ // write batch data to Pulsar as Csv
+ .output(pulsarCsvOutputFormat)
+
+ // set parallelism to write Pulsar in parallel (optional)
+ env.setParallelism(2)
+
+ // execute program
+ env.execute("Flink - Pulsar Batch Csv")
+ }
+```
+
+### Sample Output
+
+Please find sample output for above application as follows:
+```
+4,SKYLAB,1973,1974
+5,APOLLO–SOYUZ TEST PROJECT,1975,1975
+```
+
+### Complete Example
+
+You can find a complete example
[here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala).
+In this example, Flink DataSet is processed and written to Pulsar in Csv
format.
+
+
+# PulsarJsonOutputFormat
+### Usage
+
+Please find Scala sample usage of `PulsarJsonOutputFormat` as follows:
+
+```scala
+ /**
+ * NasaMission Model
+ */
+ private case class NasaMission(@BeanProperty id: Int,
+ @BeanProperty missionName: String,
+ @BeanProperty startYear: Int,
+ @BeanProperty endYear: Int)
+
+ /**
+ * Implementation
+ */
+ private val nasaMissions = List(
+ NasaMission(1, "Mercury program", 1959, 1963),
+ NasaMission(2, "Apollo program", 1961, 1972),
+ NasaMission(3, "Gemini program", 1963, 1966),
+ NasaMission(4, "Skylab", 1973, 1974),
+ NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))
+
+ private val SERVICE_URL = "pulsar://127.0.0.1:6650"
+ private val TOPIC_NAME = "my-flink-topic"
+
+ def main(args: Array[String]): Unit = {
+
+ // set up the execution environment
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ // create PulsarJsonOutputFormat instance
+ val pulsarJsonOutputFormat = new
PulsarJsonOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)
+
+ // create DataSet
+ val nasaMissionDS = env.fromCollection(nasaMissions)
+
+ // map nasa mission names to upper-case
+ nasaMissionDS.map(nasaMission =>
+ NasaMission(
+ nasaMission.id,
+ nasaMission.missionName.toUpperCase,
+ nasaMission.startYear,
+ nasaMission.endYear))
+
+ // filter missions which started after 1970
+ .filter(_.startYear > 1970)
+
+ // write batch data to Pulsar
+ .output(pulsarJsonOutputFormat)
+
+ // set parallelism to write Pulsar in parallel (optional)
+ env.setParallelism(2)
+
+ // execute program
+ env.execute("Flink - Pulsar Batch Json")
+ }
+```
+
+**Note:** Property definitions of the model should cover `@BeanProperty` to be
visible.
+
+### Sample Output
+
+Please find sample output for above application as follows:
+```
+{"id":4,"missionName":"SKYLAB","startYear":1973,"endYear":1974}
+{"id":5,"missionName":"APOLLO–SOYUZ TEST
PROJECT","startYear":1975,"endYear":1975}
+```
+
+### Complete Example
+
+You can find a complete example
[here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala).
+In this example, Flink DataSet is processed and written to Pulsar in Json
format.
diff --git a/pulsar-flink/pom.xml b/pulsar-flink/pom.xml
index e48d213..b91af16 100644
--- a/pulsar-flink/pom.xml
+++ b/pulsar-flink/pom.xml
@@ -52,12 +52,6 @@
</dependency>
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-scala_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client</artifactId>
<version>${project.version}</version>
diff --git a/site2/docs/io-rabbitmq.md b/site2/docs/io-rabbitmq.md
index d2cea15..0edaed1 100644
--- a/site2/docs/io-rabbitmq.md
+++ b/site2/docs/io-rabbitmq.md
@@ -1,12 +1,12 @@
---
id: io-rabbitmq
title: RabbitMQ Connector
-sidebar_label: RabittMQ Connector
+sidebar_label: RabbitMQ Connector
---
## Source
-The RabittMQ Source connector is used for receiving messages from a RabittMQ
cluster and writing
+The RabbitMQ Source connector is used for receiving messages from a RabbitMQ
cluster and writing
messages to Pulsar topics.
### Source Configuration Options