algirdas-k commented on issue #20950: URL: https://github.com/apache/beam/issues/20950#issuecomment-1367158844
It seems a lot of work for this to be solved is done: - [Code link](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java#L44) in an issue description points to empty line. - Beam `PubsubMessage` [has](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java#L65-L70) Ordering key set/get - Coder exists, `PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder`, which [has ordering key encoding/decoding](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.java#L67) - Pipeline (at least unbounded sink) passes ordering key: - During [map](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java#L30) from Beam `PubsubMessage` to GCP `proto` `PubsubMessage`, to `byte[]`, ordering key is [mapped](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java#L44-L48) (via [PubsubUnboundedSink.expand](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java#L484-L487)) - Later, when `PubsubUnboundedSink` [writes](https://github.com/apache/beam/blob/v2.43.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java#L499-L527) these `byte[]` to `Pub/Sub`, it [uses](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java#L514) [OutgoingMessageCoder](https://github.com/apache/beam/blob/v2.43.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java#L98-L118), which does not loose ordering key, as it encodes whole proto message So technically, everything is in place, and should work, as long as correct coder (mapping ordering key) is used. However, we get writes of ordering key when running on `DirectRunner`, but no ordering key, when running on `DataflowRunner`. --- We have a reproduce of strange behavior: ### Case 1 Using custom coder + `DirectRunner` -> ordering key is written to Pub/Sub ```java messages.apply( PubsubIO.writeMessages().to(topic) ); ``` ### Case 2 Deploying same pipeline from **Case 1** on `DataflowRunner` -> ordering key is missing ### Case 3 Copy/pasting `PubsubUnboundedSink` code into our namespace and using it directly to write to Pub/Sub, running on `DataflowRunner` -> **ordering key is written to Pub/Sub** ```java messages.apply( "WriteMessages", new PubsubUnboundedSinkCopy( PubsubJsonClient.FACTORY, ... ``` Maybe someone could give insight: - What peaces are missing, that `PubsubIO.writeMessages()` would work on `DataflowRunner` - Or why we get ordering key writes to Pub/Sub when we copy/paste sink? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
