This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-gcp-pubsub.git
commit 9e02c4c71bde038f4f968e90620deb4c9bd2df38 Author: Richard Deurwaarder <rdeurwaar...@bol.com> AuthorDate: Sat Aug 25 15:21:54 2018 +0200 [FLINK-9311] [pubsub] Clean up / add documentation and style issues in the PubSub connector --- flink-examples-streaming-gcp-pubsub/pom.xml | 108 +++++++++++++++++++++ .../examples/gcp/pubsub/IntegerSerializer.java | 54 +++++++++++ .../examples/gcp/pubsub/PubSubExample.java | 84 ++++++++++++++++ .../examples/gcp/pubsub/PubSubPublisher.java | 67 +++++++++++++ 4 files changed, 313 insertions(+) diff --git a/flink-examples-streaming-gcp-pubsub/pom.xml b/flink-examples-streaming-gcp-pubsub/pom.xml new file mode 100644 index 0000000..ab1c91f --- /dev/null +++ b/flink-examples-streaming-gcp-pubsub/pom.xml @@ -0,0 +1,108 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>flink-examples-build-helper</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.9-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-examples-streaming-gcp-pubsub_${scala.binary.version}</artifactId> + <name>flink-examples-streaming-gcp-pubsub</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-gcp-pubsub_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-examples-streaming_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <finalName>PubSub</finalName> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-deploy-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade-flink</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <shadeTestJar>false</shadeTestJar> + <shadedArtifactAttached>false</shadedArtifactAttached> + <createDependencyReducedPom>false</createDependencyReducedPom> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.flink.streaming.examples.gcp.pubsub.PubSubExample</mainClass> + </transformer> + </transformers> + <artifactSet> + <includes> + <include>org.apache.flink:flink-connector-gcp-pubsub*</include> + <include>com.google.cloud:google-cloud-pubsub</include> + <include>com.google.*:*</include> + <include>org.threeten:*</include> + <include>io.grpc:*</include> + <include>io.opencensus:*</include> + </includes> + </artifactSet> + <relocations> + <relocation> + <pattern>com.google</pattern> + <shadedPattern>org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google</shadedPattern> + </relocation> + </relocations> + + <filters> + <filter> + <artifact>org.apache.flink:flink-examples-streaming_*</artifact> + <includes> + <include>org/apache/flink/streaming/examples/gcp/pubsub/**</include> + </includes> + </filter> + </filters> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/IntegerSerializer.java b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/IntegerSerializer.java new file mode 100644 index 0000000..3c1eab4 --- /dev/null +++ b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/IntegerSerializer.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.gcp.pubsub; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; + +import com.google.pubsub.v1.PubsubMessage; + +import java.io.IOException; +import java.math.BigInteger; + +/** + * Deserialization schema to deserialize messages produced by {@link PubSubPublisher}. + * The byte[] received by this schema must contain a single Integer. + */ +class IntegerSerializer implements PubSubDeserializationSchema<Integer>, SerializationSchema<Integer> { + + @Override + public Integer deserialize(PubsubMessage message) throws IOException { + return new BigInteger(message.getData().toByteArray()).intValue(); + } + + @Override + public boolean isEndOfStream(Integer integer) { + return false; + } + + @Override + public TypeInformation<Integer> getProducedType() { + return TypeInformation.of(Integer.class); + } + + @Override + public byte[] serialize(Integer integer) { + return BigInteger.valueOf(integer).toByteArray(); + } +} diff --git a/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java new file mode 100644 index 0000000..7b66577 --- /dev/null +++ b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.gcp.pubsub; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink; +import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A simple PubSub example. + * + * <p>Before starting a flink job it will publish 10 messages on the input topic. + * + * Then a flink job is started to read these 10 messages from the input-subscription, + * it will print them to stdout + * and then write them to a the output-topic.</p> + */ +public class PubSubExample { + private static final Logger LOG = LoggerFactory.getLogger(PubSubExample.class); + + public static void main(String[] args) throws Exception { + // parse input arguments + final ParameterTool parameterTool = ParameterTool.fromArgs(args); + + if (parameterTool.getNumberOfParameters() < 3) { + System.out.println("Missing parameters!\n" + + "Usage: flink run PubSub.jar --input-subscription <subscription> --input-topicName <topic> --output-topicName <output-topic> " + + "--google-project <google project name> "); + return; + } + + String projectName = parameterTool.getRequired("google-project"); + String inputTopicName = parameterTool.getRequired("input-topicName"); + String subscriptionName = parameterTool.getRequired("input-subscription"); + String outputTopicName = parameterTool.getRequired("output-topicName"); + + PubSubPublisher pubSubPublisher = new PubSubPublisher(projectName, inputTopicName); + pubSubPublisher.publish(10); + + runFlinkJob(projectName, subscriptionName, outputTopicName); + } + + private static void runFlinkJob(String projectName, String subscriptionName, String outputTopicName) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(1000L); + + env.addSource(PubSubSource.newBuilder(Integer.class) + .withDeserializationSchema(new IntegerSerializer()) + .withProjectName(projectName) + .withSubscriptionName(subscriptionName) + .build()) + .map(PubSubExample::printAndReturn).disableChaining() + .addSink(PubSubSink.newBuilder(Integer.class) + .withSerializationSchema(new IntegerSerializer()) + .withProjectName(projectName) + .withTopicName(outputTopicName).build()); + + env.execute("Flink Streaming PubSubReader"); + } + + private static Integer printAndReturn(Integer i) { + LOG.info("Processed message with payload: " + i); + return i; + } +} diff --git a/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubPublisher.java b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubPublisher.java new file mode 100644 index 0000000..8f7bfe6 --- /dev/null +++ b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubPublisher.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.gcp.pubsub; + +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; + +import java.math.BigInteger; + +/** + * Helper class to send PubSubMessages to a PubSub topic. + */ +class PubSubPublisher { + private final String projectName; + private final String topicName; + + PubSubPublisher(String projectName, String topicName) { + this.projectName = projectName; + this.topicName = topicName; + } + + /** + * Publish messages with as payload a single integer. + * The integers inside the messages start from 0 and increase by one for each message send. + * @param amountOfMessages amount of messages to send + */ + void publish(int amountOfMessages) { + Publisher publisher = null; + try { + publisher = Publisher.newBuilder(ProjectTopicName.of(projectName, topicName)).build(); + for (int i = 0; i < amountOfMessages; i++) { + ByteString messageData = ByteString.copyFrom(BigInteger.valueOf(i).toByteArray()); + PubsubMessage message = PubsubMessage.newBuilder().setData(messageData).build(); + publisher.publish(message).get(); + + System.out.println("Published message: " + i); + Thread.sleep(100L); + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + try { + if (publisher != null) { + publisher.shutdown(); + } + } catch (Exception e) { + } + } + } +}