[jira] [Commented] (BEAM-7819) PubsubMessage message parsing is lacking non-attribute fields
[ https://issues.apache.org/jira/browse/BEAM-7819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17131290#comment-17131290 ] Beam JIRA Bot commented on BEAM-7819: - This issue was marked "stale-assigned" and has not received a public comment in 7 days. It is now automatically unassigned. If you are still working on it, you can assign it to yourself again. Please also give an update about the status of the work. > PubsubMessage message parsing is lacking non-attribute fields > - > > Key: BEAM-7819 > URL: https://issues.apache.org/jira/browse/BEAM-7819 > Project: Beam > Issue Type: Bug > Components: io-py-gcp >Reporter: Ahmet Altay >Priority: P2 > Time Spent: 9h 40m > Remaining Estimate: 0h > > User reported issue: > https://lists.apache.org/thread.html/139b0c15abc6471a2e2202d76d915c645a529a23ecc32cd9cfecd315@%3Cuser.beam.apache.org%3E > """ > Looking at the source code, with my untrained python eyes, I think if the > intention is to include the message id and the publish time in the attributes > attribute of the PubSubMessage type, then the protobuf mapping is missing > something:- > @staticmethod > def _from_proto_str(proto_msg): > """Construct from serialized form of ``PubsubMessage``. > Args: > proto_msg: String containing a serialized protobuf of type > https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage > Returns: > A new PubsubMessage object. > """ > msg = pubsub.types.pubsub_pb2.PubsubMessage() > msg.ParseFromString(proto_msg) > # Convert ScalarMapContainer to dict. > attributes = dict((key, msg.attributes[key]) for key in msg.attributes) > return PubsubMessage(msg.data, attributes) > The protobuf definition is here:- > https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage > and so it looks as if the message_id and publish_time are not being parsed as > they are seperate from the attributes. Perhaps the PubsubMessage class needs > expanding to include these as attributes, or they would need adding to the > dictionary for attributes. This would only need doing for the _from_proto_str > as obviously they would not need to be populated when transmitting a message > to PubSub. > My python is not great, I'm assuming the latter option would need to look > something like this? > attributes = dict((key, msg.attributes[key]) for key in msg.attributes) > attributes.update({'message_id': msg.message_id, 'publish_time': > msg.publish_time}) > return PubsubMessage(msg.data, attributes) > """ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-7819) PubsubMessage message parsing is lacking non-attribute fields
[ https://issues.apache.org/jira/browse/BEAM-7819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17121959#comment-17121959 ] Kenneth Knowles commented on BEAM-7819: --- This issue is assigned but has not received an update in 30 days so it has been labeled "stale-assigned". If you are still working on the issue, please give an update and remove the label. If you are no longer working on the issue, please unassign so someone else may work on it. In 7 days the issue will be automatically unassigned. > PubsubMessage message parsing is lacking non-attribute fields > - > > Key: BEAM-7819 > URL: https://issues.apache.org/jira/browse/BEAM-7819 > Project: Beam > Issue Type: Bug > Components: io-py-gcp >Reporter: Ahmet Altay >Assignee: Matthew Darwin >Priority: P2 > Labels: stale-assigned > Time Spent: 9h 40m > Remaining Estimate: 0h > > User reported issue: > https://lists.apache.org/thread.html/139b0c15abc6471a2e2202d76d915c645a529a23ecc32cd9cfecd315@%3Cuser.beam.apache.org%3E > """ > Looking at the source code, with my untrained python eyes, I think if the > intention is to include the message id and the publish time in the attributes > attribute of the PubSubMessage type, then the protobuf mapping is missing > something:- > @staticmethod > def _from_proto_str(proto_msg): > """Construct from serialized form of ``PubsubMessage``. > Args: > proto_msg: String containing a serialized protobuf of type > https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage > Returns: > A new PubsubMessage object. > """ > msg = pubsub.types.pubsub_pb2.PubsubMessage() > msg.ParseFromString(proto_msg) > # Convert ScalarMapContainer to dict. > attributes = dict((key, msg.attributes[key]) for key in msg.attributes) > return PubsubMessage(msg.data, attributes) > The protobuf definition is here:- > https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage > and so it looks as if the message_id and publish_time are not being parsed as > they are seperate from the attributes. Perhaps the PubsubMessage class needs > expanding to include these as attributes, or they would need adding to the > dictionary for attributes. This would only need doing for the _from_proto_str > as obviously they would not need to be populated when transmitting a message > to PubSub. > My python is not great, I'm assuming the latter option would need to look > something like this? > attributes = dict((key, msg.attributes[key]) for key in msg.attributes) > attributes.update({'message_id': msg.message_id, 'publish_time': > msg.publish_time}) > return PubsubMessage(msg.data, attributes) > """ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-7819) PubsubMessage message parsing is lacking non-attribute fields
[ https://issues.apache.org/jira/browse/BEAM-7819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923683#comment-16923683 ] Udi Meiri commented on BEAM-7819: - Also, you might get inconsistent results from Dataflow: sometimes the message_id and publish_time will be filled and sometimes they will be empty. The feature is currently being rolled out. > PubsubMessage message parsing is lacking non-attribute fields > - > > Key: BEAM-7819 > URL: https://issues.apache.org/jira/browse/BEAM-7819 > Project: Beam > Issue Type: Bug > Components: io-py-gcp >Reporter: Ahmet Altay >Assignee: Matthew Darwin >Priority: Major > Time Spent: 9h 40m > Remaining Estimate: 0h > > User reported issue: > https://lists.apache.org/thread.html/139b0c15abc6471a2e2202d76d915c645a529a23ecc32cd9cfecd315@%3Cuser.beam.apache.org%3E > """ > Looking at the source code, with my untrained python eyes, I think if the > intention is to include the message id and the publish time in the attributes > attribute of the PubSubMessage type, then the protobuf mapping is missing > something:- > @staticmethod > def _from_proto_str(proto_msg): > """Construct from serialized form of ``PubsubMessage``. > Args: > proto_msg: String containing a serialized protobuf of type > https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage > Returns: > A new PubsubMessage object. > """ > msg = pubsub.types.pubsub_pb2.PubsubMessage() > msg.ParseFromString(proto_msg) > # Convert ScalarMapContainer to dict. > attributes = dict((key, msg.attributes[key]) for key in msg.attributes) > return PubsubMessage(msg.data, attributes) > The protobuf definition is here:- > https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage > and so it looks as if the message_id and publish_time are not being parsed as > they are seperate from the attributes. Perhaps the PubsubMessage class needs > expanding to include these as attributes, or they would need adding to the > dictionary for attributes. This would only need doing for the _from_proto_str > as obviously they would not need to be populated when transmitting a message > to PubSub. > My python is not great, I'm assuming the latter option would need to look > something like this? > attributes = dict((key, msg.attributes[key]) for key in msg.attributes) > attributes.update({'message_id': msg.message_id, 'publish_time': > msg.publish_time}) > return PubsubMessage(msg.data, attributes) > """ -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (BEAM-7819) PubsubMessage message parsing is lacking non-attribute fields
[ https://issues.apache.org/jira/browse/BEAM-7819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923665#comment-16923665 ] Udi Meiri commented on BEAM-7819: - Adding '--kms_key_name ""' should fix the issue. Don't forget to rerun "python setup.py sdist" to recreate the tarball passed to sdk_location every time. I'm not sure what the worker_jar does but I'd leave it in. :) {code} python setup.py sdist && ./scripts/run_integration_test.sh --project [my-project] --gcs_location gs://[my-project]/tmp --test_opts --tests=apache_beam.io.gcp.pubsub_integration_test --runner TestDataflowRunner --sdk_location ./dist/apache-beam-2.16.0.dev0.tar.gz --worker_jar ../../runners/google-cloud-dataflow-java/worker/build/libs/beam-runners-google-cloud-dataflow-java-fn-api-worker-2.16.0-SNAPSHOT.jar --kms_key_name="" {code} > PubsubMessage message parsing is lacking non-attribute fields > - > > Key: BEAM-7819 > URL: https://issues.apache.org/jira/browse/BEAM-7819 > Project: Beam > Issue Type: Bug > Components: io-py-gcp >Reporter: Ahmet Altay >Assignee: Matthew Darwin >Priority: Major > Time Spent: 9h 40m > Remaining Estimate: 0h > > User reported issue: > https://lists.apache.org/thread.html/139b0c15abc6471a2e2202d76d915c645a529a23ecc32cd9cfecd315@%3Cuser.beam.apache.org%3E > """ > Looking at the source code, with my untrained python eyes, I think if the > intention is to include the message id and the publish time in the attributes > attribute of the PubSubMessage type, then the protobuf mapping is missing > something:- > @staticmethod > def _from_proto_str(proto_msg): > """Construct from serialized form of ``PubsubMessage``. > Args: > proto_msg: String containing a serialized protobuf of type > https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage > Returns: > A new PubsubMessage object. > """ > msg = pubsub.types.pubsub_pb2.PubsubMessage() > msg.ParseFromString(proto_msg) > # Convert ScalarMapContainer to dict. > attributes = dict((key, msg.attributes[key]) for key in msg.attributes) > return PubsubMessage(msg.data, attributes) > The protobuf definition is here:- > https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage > and so it looks as if the message_id and publish_time are not being parsed as > they are seperate from the attributes. Perhaps the PubsubMessage class needs > expanding to include these as attributes, or they would need adding to the > dictionary for attributes. This would only need doing for the _from_proto_str > as obviously they would not need to be populated when transmitting a message > to PubSub. > My python is not great, I'm assuming the latter option would need to look > something like this? > attributes = dict((key, msg.attributes[key]) for key in msg.attributes) > attributes.update({'message_id': msg.message_id, 'publish_time': > msg.publish_time}) > return PubsubMessage(msg.data, attributes) > """ -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (BEAM-7819) PubsubMessage message parsing is lacking non-attribute fields
[ https://issues.apache.org/jira/browse/BEAM-7819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923646#comment-16923646 ] Matthew Darwin commented on BEAM-7819: -- Struggling a little with running the integration test locally; does it need both a the tarball building and the worker_jar? ./scripts/run_integration_test.sh --project [my-project] --gcs_location gs://[my-project]/tmp --test_opts --tests=apache_beam.io.gcp.pubsub_integration_test --runner TestDataflowRunner --sdk_location ./dist/apache-beam-2.16.0.dev0.tar.gz {{AssertionError: }} {{Expected: (Test pipeline expected terminated in state: RUNNING and Expected 4 messages.)}} {{ but: Test pipeline expected terminated in state: RUNNING Test pipeline job terminated in state: FAILED}} In addition, for the pubsub integration test, given that pubsub will generate the message_id and publish_time, I'm not sure how exactly to handle that in the expected messages? > PubsubMessage message parsing is lacking non-attribute fields > - > > Key: BEAM-7819 > URL: https://issues.apache.org/jira/browse/BEAM-7819 > Project: Beam > Issue Type: Bug > Components: io-py-gcp >Reporter: Ahmet Altay >Assignee: Udi Meiri >Priority: Major > Time Spent: 9h 40m > Remaining Estimate: 0h > > User reported issue: > https://lists.apache.org/thread.html/139b0c15abc6471a2e2202d76d915c645a529a23ecc32cd9cfecd315@%3Cuser.beam.apache.org%3E > """ > Looking at the source code, with my untrained python eyes, I think if the > intention is to include the message id and the publish time in the attributes > attribute of the PubSubMessage type, then the protobuf mapping is missing > something:- > @staticmethod > def _from_proto_str(proto_msg): > """Construct from serialized form of ``PubsubMessage``. > Args: > proto_msg: String containing a serialized protobuf of type > https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage > Returns: > A new PubsubMessage object. > """ > msg = pubsub.types.pubsub_pb2.PubsubMessage() > msg.ParseFromString(proto_msg) > # Convert ScalarMapContainer to dict. > attributes = dict((key, msg.attributes[key]) for key in msg.attributes) > return PubsubMessage(msg.data, attributes) > The protobuf definition is here:- > https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage > and so it looks as if the message_id and publish_time are not being parsed as > they are seperate from the attributes. Perhaps the PubsubMessage class needs > expanding to include these as attributes, or they would need adding to the > dictionary for attributes. This would only need doing for the _from_proto_str > as obviously they would not need to be populated when transmitting a message > to PubSub. > My python is not great, I'm assuming the latter option would need to look > something like this? > attributes = dict((key, msg.attributes[key]) for key in msg.attributes) > attributes.update({'message_id': msg.message_id, 'publish_time': > msg.publish_time}) > return PubsubMessage(msg.data, attributes) > """ -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (BEAM-7819) PubsubMessage message parsing is lacking non-attribute fields
[ https://issues.apache.org/jira/browse/BEAM-7819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16893188#comment-16893188 ] Ahmet Altay commented on BEAM-7819: --- BEAM-3489 is for java, this is for python. Otherwise they look similar. > PubsubMessage message parsing is lacking non-attribute fields > - > > Key: BEAM-7819 > URL: https://issues.apache.org/jira/browse/BEAM-7819 > Project: Beam > Issue Type: Bug > Components: io-python-gcp >Reporter: Ahmet Altay >Assignee: Udi Meiri >Priority: Major > > User reported issue: > https://lists.apache.org/thread.html/139b0c15abc6471a2e2202d76d915c645a529a23ecc32cd9cfecd315@%3Cuser.beam.apache.org%3E > """ > Looking at the source code, with my untrained python eyes, I think if the > intention is to include the message id and the publish time in the attributes > attribute of the PubSubMessage type, then the protobuf mapping is missing > something:- > @staticmethod > def _from_proto_str(proto_msg): > """Construct from serialized form of ``PubsubMessage``. > Args: > proto_msg: String containing a serialized protobuf of type > https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage > Returns: > A new PubsubMessage object. > """ > msg = pubsub.types.pubsub_pb2.PubsubMessage() > msg.ParseFromString(proto_msg) > # Convert ScalarMapContainer to dict. > attributes = dict((key, msg.attributes[key]) for key in msg.attributes) > return PubsubMessage(msg.data, attributes) > The protobuf definition is here:- > https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage > and so it looks as if the message_id and publish_time are not being parsed as > they are seperate from the attributes. Perhaps the PubsubMessage class needs > expanding to include these as attributes, or they would need adding to the > dictionary for attributes. This would only need doing for the _from_proto_str > as obviously they would not need to be populated when transmitting a message > to PubSub. > My python is not great, I'm assuming the latter option would need to look > something like this? > attributes = dict((key, msg.attributes[key]) for key in msg.attributes) > attributes.update({'message_id': msg.message_id, 'publish_time': > msg.publish_time}) > return PubsubMessage(msg.data, attributes) > """ -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (BEAM-7819) PubsubMessage message parsing is lacking non-attribute fields
[ https://issues.apache.org/jira/browse/BEAM-7819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16893184#comment-16893184 ] Jason Ganetsky commented on BEAM-7819: -- This looks like a dupe: https://issues.apache.org/jira/browse/BEAM-3489 > PubsubMessage message parsing is lacking non-attribute fields > - > > Key: BEAM-7819 > URL: https://issues.apache.org/jira/browse/BEAM-7819 > Project: Beam > Issue Type: Bug > Components: io-python-gcp >Reporter: Ahmet Altay >Assignee: Udi Meiri >Priority: Major > > User reported issue: > https://lists.apache.org/thread.html/139b0c15abc6471a2e2202d76d915c645a529a23ecc32cd9cfecd315@%3Cuser.beam.apache.org%3E > """ > Looking at the source code, with my untrained python eyes, I think if the > intention is to include the message id and the publish time in the attributes > attribute of the PubSubMessage type, then the protobuf mapping is missing > something:- > @staticmethod > def _from_proto_str(proto_msg): > """Construct from serialized form of ``PubsubMessage``. > Args: > proto_msg: String containing a serialized protobuf of type > https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage > Returns: > A new PubsubMessage object. > """ > msg = pubsub.types.pubsub_pb2.PubsubMessage() > msg.ParseFromString(proto_msg) > # Convert ScalarMapContainer to dict. > attributes = dict((key, msg.attributes[key]) for key in msg.attributes) > return PubsubMessage(msg.data, attributes) > The protobuf definition is here:- > https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage > and so it looks as if the message_id and publish_time are not being parsed as > they are seperate from the attributes. Perhaps the PubsubMessage class needs > expanding to include these as attributes, or they would need adding to the > dictionary for attributes. This would only need doing for the _from_proto_str > as obviously they would not need to be populated when transmitting a message > to PubSub. > My python is not great, I'm assuming the latter option would need to look > something like this? > attributes = dict((key, msg.attributes[key]) for key in msg.attributes) > attributes.update({'message_id': msg.message_id, 'publish_time': > msg.publish_time}) > return PubsubMessage(msg.data, attributes) > """ -- This message was sent by Atlassian JIRA (v7.6.14#76016)