This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 71003e2 [documentation][example] Flink Source & Sink Connector (#2561) 71003e2 is described below commit 71003e20e6606527c2f538ba2059850d9539d878 Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Wed Sep 12 01:38:56 2018 -0700 [documentation][example] Flink Source & Sink Connector (#2561) ### Motivation We added flink source connector (#2441) and sink connector (#2434). It would be great to an example to show how to use flink source & sink connector. ### Modifications - introduce an `examples` module - introduce an `examples/flink-consumer-source` module - add a word count example to use flink source and sink connector ### Result be able to know how to use flink source & sink connector --- examples/flink-consumer-source/README.md | 76 +++++++++++++ examples/flink-consumer-source/pom.xml | 97 ++++++++++++++++ .../flink/PulsarConsumerSourceWordCount.java | 126 +++++++++++++++++++++ examples/pom.xml | 39 +++++++ pom.xml | 3 + .../connectors/pulsar/FlinkPulsarProducer.java | 10 +- .../connectors/pulsar/PulsarConsumerSource.java | 1 - .../pulsar/partitioner/PulsarKeyExtractor.java | 4 +- 8 files changed, 350 insertions(+), 6 deletions(-) diff --git a/examples/flink-consumer-source/README.md b/examples/flink-consumer-source/README.md new file mode 100644 index 0000000..38b75b6 --- /dev/null +++ b/examples/flink-consumer-source/README.md @@ -0,0 +1,76 @@ +## Apache Flink Connectors for Pulsar + +This page describes how to use the connectors to read and write Pulsar topics with [Apache Flink](https://flink.apache.org/) stream processing applications. + +Build end-to-end stream processing pipelines that use Pulsar as the stream storage and message bus, and Apache Flink for computation over the streams. +See the [Pulsar Concepts](https://pulsar.incubator.apache.org/docs/en/concepts-overview/) page for more information. + +## Example + +### PulsarConsumerSourceWordCount + +This Flink streaming job is consuming from a Pulsar topic and couting the wordcount in a streaming fashion. The job can write the word count results +to stdout or another Pulsar topic. + +The steps to run the example: + +1. Start Pulsar Standalone. + + You can follow the [instructions](https://pulsar.incubator.apache.org/docs/en/standalone/) to start a Pulsar standalone locally. + + ```shell + $ bin/pulsar standalone + ``` + +2. Start Flink locally. + + You can follow the [instructions](https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/setup_quickstart.html) to download and start Flink. + + ```shell + $ ./bin/start-cluster.sh + ``` + +3. Build the examples. + + ```shell + $ cd ${PULSAR_HOME} + $ mvn clean install -DskipTests + ``` + +4. Run the word count example to print results to stdout. + + ```shell + $ ./bin/flink run ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-streaming-wordcount.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub + ``` + +5. Produce messages to topic `test_src`. + + ```shell + $ bin/pulsar-client produce -m "hello world test again" -n 100 test_src + ``` + +6. You can check the flink taskexecutor `.out` file. The `.out` file will print the counts at the end of each time window as long as words are floating in, e.g.: + + ```shell +PulsarConsumerSourceWordCount.WordWithCount(word=hello, count=200) +PulsarConsumerSourceWordCount.WordWithCount(word=again, count=200) +PulsarConsumerSourceWordCount.WordWithCount(word=test, count=200) +PulsarConsumerSourceWordCount.WordWithCount(word=world, count=200) +PulsarConsumerSourceWordCount.WordWithCount(word=hello, count=100) +PulsarConsumerSourceWordCount.WordWithCount(word=again, count=100) +PulsarConsumerSourceWordCount.WordWithCount(word=test, count=100) + ``` + +Alternatively, when you run the flink word count example at step 4, you can choose dump the result to another pulsar topic. + +```shell +$ ./bin/flink run ${PULSAR_HOME}/examples/flink-consumer-source/target/pulsar-flink-streaming-wordcount.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest +``` + +Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_dest`. + +```shell +$ bin/pulsar-client consume -n 0 -s test test_dest +``` + +You will see similar results as what you see at step 6 when running the word count example to print results to stdout. diff --git a/examples/flink-consumer-source/pom.xml b/examples/flink-consumer-source/pom.xml new file mode 100644 index 0000000..f7ed5d0 --- /dev/null +++ b/examples/flink-consumer-source/pom.xml @@ -0,0 +1,97 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.pulsar.examples</groupId> + <artifactId>pulsar-examples</artifactId> + <version>2.2.0-incubating-SNAPSHOT</version> + </parent> + + <groupId>org.apache.pulsar.examples</groupId> + <artifactId>flink-consumer-source</artifactId> + <name>Pulsar Examples :: Flink Consumer Source</name> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-client-schema</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-flink</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-client-original</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>pulsar-streaming-wordcount</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <shadeTestJar>false</shadeTestJar> + <shadedArtifactAttached>false</shadedArtifactAttached> + <createDependencyReducedPom>false</createDependencyReducedPom> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.pulsar.examples.flink.PulsarConsumerSourceWordCount</mainClass> + </transformer> + </transformers> + <finalName>pulsar-flink-streaming-wordcount</finalName> + <filters> + <filter> + <artifact>*</artifact> + <includes> + <include>org/apache/flink/streaming/examples/kafka/**</include> + <include>org/apache/flink/streaming/**</include> + <include>org/apache/pulsar/**</include> + </includes> + </filter> + </filters> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/examples/flink-consumer-source/src/main/java/org/apache/pulsar/examples/flink/PulsarConsumerSourceWordCount.java b/examples/flink-consumer-source/src/main/java/org/apache/pulsar/examples/flink/PulsarConsumerSourceWordCount.java new file mode 100644 index 0000000..e163f60 --- /dev/null +++ b/examples/flink-consumer-source/src/main/java/org/apache/pulsar/examples/flink/PulsarConsumerSourceWordCount.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.examples.flink; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +import lombok.ToString; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer; +import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder; +import org.apache.pulsar.client.api.ProducerConfiguration; + +/** + * Implements a streaming wordcount program on pulsar topics. + * + * <p>Example usage: + * --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub + */ +public class PulsarConsumerSourceWordCount { + + public static void main(String[] args) throws Exception { + // parse input arguments + final ParameterTool parameterTool = ParameterTool.fromArgs(args); + + if (parameterTool.getNumberOfParameters() < 2) { + System.out.println("Missing parameters!"); + System.out.println("Usage: pulsar --service-url <pulsar-service-url> --input-topic <topic> --subscription <sub> --output-topic <topic>"); + return; + } + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); + env.enableCheckpointing(5000); + env.getConfig().setGlobalJobParameters(parameterTool); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + String serviceUrl = parameterTool.getRequired("service-url"); + String inputTopic = parameterTool.getRequired("input-topic"); + String subscription = parameterTool.get("subscription", "flink-examples"); + String outputTopic = parameterTool.get("output-topic", null); + int parallelism = parameterTool.getInt("parallelism", 1); + + System.out.println("Parameters:"); + System.out.println("\tServiceUrl:\t" + serviceUrl); + System.out.println("\tInputTopic:\t" + inputTopic); + System.out.println("\tSubscription:\t" + subscription); + System.out.println("\tOutputTopic:\t" + outputTopic); + System.out.println("\tParallelism:\t" + parallelism); + + PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new SimpleStringSchema()) + .serviceUrl(serviceUrl) + .topic(inputTopic) + .subscriptionName(subscription); + SourceFunction<String> src = builder.build(); + DataStream<String> input = env.addSource(src); + + DataStream<WordWithCount> wc = input + .flatMap((FlatMapFunction<String, WordWithCount>) (line, collector) -> { + for (String word : line.split("\\s")) { + collector.collect(new WordWithCount(word, 1)); + } + }) + .returns(WordWithCount.class) + .keyBy("word") + .timeWindow(Time.seconds(5)) + .reduce((ReduceFunction<WordWithCount>) (c1, c2) -> + new WordWithCount(c1.word, c1.count + c2.count)); + + if (null != outputTopic) { + wc.addSink(new FlinkPulsarProducer<>( + serviceUrl, + outputTopic, + wordWithCount -> wordWithCount.toString().getBytes(UTF_8), + new ProducerConfiguration(), + wordWithCount -> wordWithCount.word + )).setParallelism(parallelism); + } else { + // print the results with a single thread, rather than in parallel + wc.print().setParallelism(1); + } + + env.execute("Pulsar Stream WordCount"); + } + + /** + * Data type for words with count. + */ + @AllArgsConstructor + @NoArgsConstructor + @ToString + public static class WordWithCount { + + public String word; + public long count; + + } + +} diff --git a/examples/pom.xml b/examples/pom.xml new file mode 100644 index 0000000..753d0ad --- /dev/null +++ b/examples/pom.xml @@ -0,0 +1,39 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <packaging>pom</packaging> + <parent> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar</artifactId> + <version>2.2.0-incubating-SNAPSHOT</version> + </parent> + + <groupId>org.apache.pulsar.examples</groupId> + <artifactId>pulsar-examples</artifactId> + <name>Pulsar Examples :: Parent</name> + + <modules> + <module>flink-consumer-source</module> + </modules> + +</project> diff --git a/pom.xml b/pom.xml index b349925..6d45a00 100644 --- a/pom.xml +++ b/pom.xml @@ -112,6 +112,9 @@ flexible messaging model and an intuitive client API.</description> <!-- connector-related modules --> <module>pulsar-io</module> + <!-- examples --> + <module>examples</module> + <!-- all these 3 modules should be put at the end in this exact sequence --> <module>distribution</module> <module>docker</module> diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java index bddfee4..2324c55 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java @@ -97,10 +97,7 @@ public class FlinkPulsarProducer<IN> /** * The callback than handles error propagation or logging callbacks. */ - protected transient Function<MessageId, MessageId> successCallback = msgId -> { - acknowledgeMessage(); - return msgId; - }; + protected transient Function<MessageId, MessageId> successCallback; protected transient Function<Throwable, MessageId> failureCallback; @@ -205,6 +202,11 @@ public class FlinkPulsarProducer<IN> flushOnCheckpoint = false; } + this.successCallback = msgId -> { + acknowledgeMessage(); + return msgId; + }; + if (PulsarProduceMode.AT_MOST_ONCE == produceMode) { this.failureCallback = cause -> { LOG.error("Error while sending record to Pulsar : " + cause.getMessage(), cause); diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java index f1b2595..0d01def 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java @@ -127,7 +127,6 @@ class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageI while (isRunning) { message = consumer.receive(messageReceiveTimeoutMs, TimeUnit.MILLISECONDS); if (message == null) { - LOG.info("unexpected null message"); continue; } diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java index 90dc21c..270892e 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java @@ -18,10 +18,12 @@ */ package org.apache.flink.streaming.connectors.pulsar.partitioner; +import java.io.Serializable; + /** * Extract key from a value. */ -public interface PulsarKeyExtractor<IN> { +public interface PulsarKeyExtractor<IN> extends Serializable { PulsarKeyExtractor NULL = in -> null;