This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push: new 9d7045c [FLINK-13133] [pubsub] Fix small error in PubSub documentation relating to PubSubSink serializer and emulator settings 9d7045c is described below commit 9d7045c47b45814fa0b734bf3784df06dd70a490 Author: Richard Deurwaarder <rdeurwaar...@bol.com> AuthorDate: Sun Jul 7 14:18:46 2019 +0200 [FLINK-13133] [pubsub] Fix small error in PubSub documentation relating to PubSubSink serializer and emulator settings --- docs/dev/connectors/pubsub.md | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/docs/dev/connectors/pubsub.md b/docs/dev/connectors/pubsub.md index 0ee8187..59b3e00 100644 --- a/docs/dev/connectors/pubsub.md +++ b/docs/dev/connectors/pubsub.md @@ -94,7 +94,7 @@ DataStream<SomeObject> dataStream = (...); SerializationSchema<SomeObject> serializationSchema = (...); SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder() - .withDeserializationSchema(deserializer) + .withSerializationSchema(serializationSchema) .withProjectName("project") .withSubscriptionName("subscription") .build() @@ -120,18 +120,20 @@ The following example shows how you would create a source to read messages from <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} +String hostAndPort = "localhost:1234"; DeserializationSchema<SomeObject> deserializationSchema = (...); SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder() .withDeserializationSchema(deserializationSchema) .withProjectName("my-fake-project") .withSubscriptionName("subscription") - .withPubSubSubscriberFactory(new PubSubSubscriberFactoryForEmulator("localhost:1234", "my-fake-project", "subscription", 10, Duration.ofSeconds(15), 100)) + .withPubSubSubscriberFactory(new PubSubSubscriberFactoryForEmulator(hostAndPort, "my-fake-project", "subscription", 10, Duration.ofSeconds(15), 100)) .build(); +SerializationSchema<SomeObject> serializationSchema = (...); SinkFunction<SomeObject> pubsubSink = PubSubSink.newBuilder() - .withDeserializationSchema(deserializationSchema) + .withSerializationSchema(serializationSchema) .withProjectName("my-fake-project") .withSubscriptionName("subscription") - .withHostAndPortForEmulator(getPubSubHostPort()) + .withHostAndPortForEmulator(hostAndPort) .build() StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();