This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-gcp-pubsub.git

commit 9e02c4c71bde038f4f968e90620deb4c9bd2df38
Author: Richard Deurwaarder <rdeurwaar...@bol.com>
AuthorDate: Sat Aug 25 15:21:54 2018 +0200

    [FLINK-9311] [pubsub] Clean up / add documentation and style issues in the 
PubSub connector
---
 flink-examples-streaming-gcp-pubsub/pom.xml        | 108 +++++++++++++++++++++
 .../examples/gcp/pubsub/IntegerSerializer.java     |  54 +++++++++++
 .../examples/gcp/pubsub/PubSubExample.java         |  84 ++++++++++++++++
 .../examples/gcp/pubsub/PubSubPublisher.java       |  67 +++++++++++++
 4 files changed, 313 insertions(+)

diff --git a/flink-examples-streaming-gcp-pubsub/pom.xml 
b/flink-examples-streaming-gcp-pubsub/pom.xml
new file mode 100644
index 0000000..ab1c91f
--- /dev/null
+++ b/flink-examples-streaming-gcp-pubsub/pom.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <artifactId>flink-examples-build-helper</artifactId>
+               <groupId>org.apache.flink</groupId>
+               <version>1.9-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       
<artifactId>flink-examples-streaming-gcp-pubsub_${scala.binary.version}</artifactId>
+       <name>flink-examples-streaming-gcp-pubsub</name>
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-gcp-pubsub_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-examples-streaming_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <finalName>PubSub</finalName>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-deploy-plugin</artifactId>
+                               <configuration>
+                                       <skip>true</skip>
+                               </configuration>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>shade-flink</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<shadeTestJar>false</shadeTestJar>
+                                                       
<shadedArtifactAttached>false</shadedArtifactAttached>
+                                                       
<createDependencyReducedPom>false</createDependencyReducedPom>
+                                                       <transformers>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                                                       
<mainClass>org.apache.flink.streaming.examples.gcp.pubsub.PubSubExample</mainClass>
+                                                               </transformer>
+                                                       </transformers>
+                                                       <artifactSet>
+                                                               <includes>
+                                                                       
<include>org.apache.flink:flink-connector-gcp-pubsub*</include>
+                                                                       
<include>com.google.cloud:google-cloud-pubsub</include>
+                                                                       
<include>com.google.*:*</include>
+                                                                       
<include>org.threeten:*</include>
+                                                                       
<include>io.grpc:*</include>
+                                                                       
<include>io.opencensus:*</include>
+                                                               </includes>
+                                                       </artifactSet>
+                                                       <relocations>
+                                                               <relocation>
+                                                                       
<pattern>com.google</pattern>
+                                                                       
<shadedPattern>org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google</shadedPattern>
+                                                               </relocation>
+                                                       </relocations>
+
+                                                       <filters>
+                                                               <filter>
+                                                                       
<artifact>org.apache.flink:flink-examples-streaming_*</artifact>
+                                                                       
<includes>
+                                                                               
<include>org/apache/flink/streaming/examples/gcp/pubsub/**</include>
+                                                                       
</includes>
+                                                               </filter>
+                                                       </filters>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+</project>
diff --git 
a/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/IntegerSerializer.java
 
b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/IntegerSerializer.java
new file mode 100644
index 0000000..3c1eab4
--- /dev/null
+++ 
b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/IntegerSerializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.gcp.pubsub;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+
+import com.google.pubsub.v1.PubsubMessage;
+
+import java.io.IOException;
+import java.math.BigInteger;
+
+/**
+ * Deserialization schema to deserialize messages produced by {@link 
PubSubPublisher}.
+ * The byte[] received by this schema must contain a single Integer.
+ */
+class IntegerSerializer implements PubSubDeserializationSchema<Integer>, 
SerializationSchema<Integer> {
+
+       @Override
+       public Integer deserialize(PubsubMessage message) throws IOException {
+               return new 
BigInteger(message.getData().toByteArray()).intValue();
+       }
+
+       @Override
+       public boolean isEndOfStream(Integer integer) {
+               return false;
+       }
+
+       @Override
+       public TypeInformation<Integer> getProducedType() {
+               return TypeInformation.of(Integer.class);
+       }
+
+       @Override
+       public byte[] serialize(Integer integer) {
+               return BigInteger.valueOf(integer).toByteArray();
+       }
+}
diff --git 
a/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
 
b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
new file mode 100644
index 0000000..7b66577
--- /dev/null
+++ 
b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.gcp.pubsub;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
+import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple PubSub example.
+ *
+ * <p>Before starting a flink job it will publish 10 messages on the input 
topic.
+ *
+ * Then a flink job is started to read these 10 messages from the 
input-subscription,
+ * it will print them to stdout
+ * and then write them to a the output-topic.</p>
+ */
+public class PubSubExample {
+       private static final Logger LOG = 
LoggerFactory.getLogger(PubSubExample.class);
+
+       public static void main(String[] args) throws Exception {
+               // parse input arguments
+               final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
+
+               if (parameterTool.getNumberOfParameters() < 3) {
+                       System.out.println("Missing parameters!\n" +
+                                                               "Usage: flink 
run PubSub.jar --input-subscription <subscription> --input-topicName <topic> 
--output-topicName <output-topic> " +
+                                                               
"--google-project <google project name> ");
+                       return;
+               }
+
+               String projectName = 
parameterTool.getRequired("google-project");
+               String inputTopicName = 
parameterTool.getRequired("input-topicName");
+               String subscriptionName = 
parameterTool.getRequired("input-subscription");
+               String outputTopicName = 
parameterTool.getRequired("output-topicName");
+
+               PubSubPublisher pubSubPublisher = new 
PubSubPublisher(projectName, inputTopicName);
+               pubSubPublisher.publish(10);
+
+               runFlinkJob(projectName, subscriptionName, outputTopicName);
+       }
+
+       private static void runFlinkJob(String projectName, String 
subscriptionName, String outputTopicName) throws Exception {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.enableCheckpointing(1000L);
+
+               env.addSource(PubSubSource.newBuilder(Integer.class)
+                                                               
.withDeserializationSchema(new IntegerSerializer())
+                                                               
.withProjectName(projectName)
+                                                               
.withSubscriptionName(subscriptionName)
+                                                               .build())
+                       .map(PubSubExample::printAndReturn).disableChaining()
+                       .addSink(PubSubSink.newBuilder(Integer.class)
+                                                               
.withSerializationSchema(new IntegerSerializer())
+                                                               
.withProjectName(projectName)
+                                                               
.withTopicName(outputTopicName).build());
+
+               env.execute("Flink Streaming PubSubReader");
+       }
+
+       private static Integer printAndReturn(Integer i) {
+               LOG.info("Processed message with payload: " + i);
+               return i;
+       }
+}
diff --git 
a/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubPublisher.java
 
b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubPublisher.java
new file mode 100644
index 0000000..8f7bfe6
--- /dev/null
+++ 
b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubPublisher.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.gcp.pubsub;
+
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PubsubMessage;
+
+import java.math.BigInteger;
+
+/**
+ * Helper class to send PubSubMessages to a PubSub topic.
+ */
+class PubSubPublisher {
+       private final String projectName;
+       private final String topicName;
+
+       PubSubPublisher(String projectName, String topicName) {
+               this.projectName = projectName;
+               this.topicName = topicName;
+       }
+
+       /**
+        * Publish messages with as payload a single integer.
+        * The integers inside the messages start from 0 and increase by one 
for each message send.
+        * @param amountOfMessages amount of messages to send
+        */
+       void publish(int amountOfMessages) {
+               Publisher publisher = null;
+               try {
+                       publisher = 
Publisher.newBuilder(ProjectTopicName.of(projectName, topicName)).build();
+                       for (int i = 0; i < amountOfMessages; i++) {
+                               ByteString messageData = 
ByteString.copyFrom(BigInteger.valueOf(i).toByteArray());
+                               PubsubMessage message = 
PubsubMessage.newBuilder().setData(messageData).build();
+                               publisher.publish(message).get();
+
+                               System.out.println("Published message: " + i);
+                               Thread.sleep(100L);
+                       }
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               } finally {
+                       try {
+                               if (publisher != null) {
+                                       publisher.shutdown();
+                               }
+                       } catch (Exception e) {
+                       }
+               }
+       }
+}

Reply via email to