This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-gcp-pubsub.git

commit 6c5cd555525a0d9e663b5b88e12ed594520c9e66
Author: Jiangjie (Becket) Qin <becket....@gmail.com>
AuthorDate: Fri Jul 5 01:05:41 2019 +0800

    [FLINK-9311] [pubsub] Improvements to builders + minor improvement to 
PubSubSink flush logic
---
 .../org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
 
b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
index 7b66577..a960176 100644
--- 
a/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
+++ 
b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
@@ -63,13 +63,13 @@ public class PubSubExample {
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.enableCheckpointing(1000L);
 
-               env.addSource(PubSubSource.newBuilder(Integer.class)
+               env.addSource(PubSubSource.newBuilder()
                                                                
.withDeserializationSchema(new IntegerSerializer())
                                                                
.withProjectName(projectName)
                                                                
.withSubscriptionName(subscriptionName)
                                                                .build())
                        .map(PubSubExample::printAndReturn).disableChaining()
-                       .addSink(PubSubSink.newBuilder(Integer.class)
+                       .addSink(PubSubSink.newBuilder()
                                                                
.withSerializationSchema(new IntegerSerializer())
                                                                
.withProjectName(projectName)
                                                                
.withTopicName(outputTopicName).build());

Reply via email to