This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 9d7045c  [FLINK-13133] [pubsub] Fix small error in PubSub 
documentation relating to PubSubSink serializer and emulator settings
9d7045c is described below

commit 9d7045c47b45814fa0b734bf3784df06dd70a490
Author: Richard Deurwaarder <rdeurwaar...@bol.com>
AuthorDate: Sun Jul 7 14:18:46 2019 +0200

    [FLINK-13133] [pubsub] Fix small error in PubSub documentation relating to 
PubSubSink serializer and emulator settings
---
 docs/dev/connectors/pubsub.md | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/docs/dev/connectors/pubsub.md b/docs/dev/connectors/pubsub.md
index 0ee8187..59b3e00 100644
--- a/docs/dev/connectors/pubsub.md
+++ b/docs/dev/connectors/pubsub.md
@@ -94,7 +94,7 @@ DataStream<SomeObject> dataStream = (...);
 
 SerializationSchema<SomeObject> serializationSchema = (...);
 SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder()
-                                                
.withDeserializationSchema(deserializer)
+                                                
.withSerializationSchema(serializationSchema)
                                                 .withProjectName("project")
                                                 
.withSubscriptionName("subscription")
                                                 .build()
@@ -120,18 +120,20 @@ The following example shows how you would create a source 
to read messages from
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
+String hostAndPort = "localhost:1234";
 DeserializationSchema<SomeObject> deserializationSchema = (...);
 SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder()
                                                       
.withDeserializationSchema(deserializationSchema)
                                                       
.withProjectName("my-fake-project")
                                                       
.withSubscriptionName("subscription")
-                                                      
.withPubSubSubscriberFactory(new 
PubSubSubscriberFactoryForEmulator("localhost:1234", "my-fake-project", 
"subscription", 10, Duration.ofSeconds(15), 100))
+                                                      
.withPubSubSubscriberFactory(new 
PubSubSubscriberFactoryForEmulator(hostAndPort, "my-fake-project", 
"subscription", 10, Duration.ofSeconds(15), 100))
                                                       .build();
+SerializationSchema<SomeObject> serializationSchema = (...);
 SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder()
-                                                
.withDeserializationSchema(deserializationSchema)
+                                                
.withSerializationSchema(serializationSchema)
                                                 
.withProjectName("my-fake-project")
                                                 
.withSubscriptionName("subscription")
-                                                
.withHostAndPortForEmulator(getPubSubHostPort())
+                                                
.withHostAndPortForEmulator(hostAndPort)
                                                 .build()
 
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

Reply via email to