[ 
https://issues.apache.org/jira/browse/BEAM-7738?focusedWorklogId=290210&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290210
 ]

ASF GitHub Bot logged work on BEAM-7738:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Aug/19 04:29
            Start Date: 07/Aug/19 04:29
    Worklog Time Spent: 10m 
      Work Description: chadrik commented on pull request #9268: [BEAM-7738] 
Add external transform support to PubSubIO
URL: https://github.com/apache/beam/pull/9268#discussion_r311364462
 
 

 ##########
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
 ##########
 @@ -674,6 +680,85 @@ public String toString() {
       abstract Builder<T> setClock(@Nullable Clock clock);
 
       abstract Read<T> build();
+
+      @Override
+      public PTransform<PBegin, PCollection<T>> 
buildExternal(External.Configuration config) {
+        if (config.topic != null) {
+          StaticValueProvider<String> topic = 
StaticValueProvider.of(utf8String(config.topic));
+          setTopicProvider(NestedValueProvider.of(topic, new 
TopicTranslator()));
+        }
+        if (config.subscription != null) {
+          StaticValueProvider<String> subscription =
+              StaticValueProvider.of(utf8String(config.subscription));
+          setSubscriptionProvider(
+              NestedValueProvider.of(subscription, new 
SubscriptionTranslator()));
+        }
+        if (config.idAttribute != null) {
+          String idAttribute = utf8String(config.idAttribute);
+          setIdAttribute(idAttribute);
+        }
+        if (config.timestampAttribute != null) {
+          String timestampAttribute = utf8String(config.timestampAttribute);
+          setTimestampAttribute(timestampAttribute);
+        }
+        setNeedsAttributes(config.needsAttributes);
+        setPubsubClientFactory(FACTORY);
+        if (config.needsAttributes) {
+          SimpleFunction<PubsubMessage, T> parseFn =
+              (SimpleFunction<PubsubMessage, T>) new IdentityMessageFn();
+          setParseFn(parseFn);
+          // FIXME: call setCoder(). need to use PubsubMessage proto to be 
compatible with python
 
 Review comment:
   I serialized the `PubsubMessage` using protobufs.  Since there's no 
cross-language coder for `PubsubMessage`, and I assumed it would be overreach 
to add one, I used the bytes coder and then handled converting to and from 
protobufs in code that lives close to the transforms. 
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 290210)
    Time Spent: 1h  (was: 50m)

> Support PubSubIO to be configured externally for use with other SDKs
> --------------------------------------------------------------------
>
>                 Key: BEAM-7738
>                 URL: https://issues.apache.org/jira/browse/BEAM-7738
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-gcp, runner-flink, sdk-py-core
>            Reporter: Chad Dombrova
>            Assignee: Chad Dombrova
>            Priority: Major
>              Labels: portability
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> Now that KafkaIO is supported via the external transform API (BEAM-7029) we 
> should add support for PubSub.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to