[ https://issues.apache.org/jira/browse/BEAM-7819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16923665#comment-16923665 ]
Udi Meiri commented on BEAM-7819: --------------------------------- Adding '--kms_key_name ""' should fix the issue. Don't forget to rerun "python setup.py sdist" to recreate the tarball passed to sdk_location every time. I'm not sure what the worker_jar does but I'd leave it in. :) {code} python setup.py sdist && ./scripts/run_integration_test.sh --project [my-project] --gcs_location gs://[my-project]/tmp --test_opts --tests=apache_beam.io.gcp.pubsub_integration_test --runner TestDataflowRunner --sdk_location ./dist/apache-beam-2.16.0.dev0.tar.gz --worker_jar ../../runners/google-cloud-dataflow-java/worker/build/libs/beam-runners-google-cloud-dataflow-java-fn-api-worker-2.16.0-SNAPSHOT.jar --kms_key_name="" {code} > PubsubMessage message parsing is lacking non-attribute fields > ------------------------------------------------------------- > > Key: BEAM-7819 > URL: https://issues.apache.org/jira/browse/BEAM-7819 > Project: Beam > Issue Type: Bug > Components: io-py-gcp > Reporter: Ahmet Altay > Assignee: Matthew Darwin > Priority: Major > Time Spent: 9h 40m > Remaining Estimate: 0h > > User reported issue: > https://lists.apache.org/thread.html/139b0c15abc6471a2e2202d76d915c645a529a23ecc32cd9cfecd315@%3Cuser.beam.apache.org%3E > """ > Looking at the source code, with my untrained python eyes, I think if the > intention is to include the message id and the publish time in the attributes > attribute of the PubSubMessage type, then the protobuf mapping is missing > something:- > @staticmethod > def _from_proto_str(proto_msg): > """Construct from serialized form of ``PubsubMessage``. > Args: > proto_msg: String containing a serialized protobuf of type > https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage > Returns: > A new PubsubMessage object. > """ > msg = pubsub.types.pubsub_pb2.PubsubMessage() > msg.ParseFromString(proto_msg) > # Convert ScalarMapContainer to dict. > attributes = dict((key, msg.attributes[key]) for key in msg.attributes) > return PubsubMessage(msg.data, attributes) > The protobuf definition is here:- > https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage > and so it looks as if the message_id and publish_time are not being parsed as > they are seperate from the attributes. Perhaps the PubsubMessage class needs > expanding to include these as attributes, or they would need adding to the > dictionary for attributes. This would only need doing for the _from_proto_str > as obviously they would not need to be populated when transmitting a message > to PubSub. > My python is not great, I'm assuming the latter option would need to look > something like this? > attributes = dict((key, msg.attributes[key]) for key in msg.attributes) > attributes.update({'message_id': msg.message_id, 'publish_time': > msg.publish_time}) > return PubsubMessage(msg.data, attributes) > """ -- This message was sent by Atlassian Jira (v8.3.2#803003)