Beam 2.14.0 will include support for writing files in the fileio module
(the support will include GCS, local files, HDFS). It will also support
streaming. The transform is still marked as experimental, and is likely to
receive improvements - but you can check it out for your pipelines, and see
if it helps you : )
Best
-P.

On Fri, Jul 19, 2019 at 12:24 PM Valentyn Tymofieiev <valen...@google.com>
wrote:

> As of today, Beam Python streaming does not support writing to GCS yet,
> which  explains
> https://stackoverflow.com/questions/54745869/how-to-create-a-dataflow-pipeline-from-pub-sub-to-gcs-in-python
>  .
>
> You are right - id_label and timestamp_attribute does not work on Direct
> runner yet as per https://issues.apache.org/jira/browse/BEAM-4275, I
> checked with a few folks and that seems to be the current status, but you
> can still give them a try on Dataflow runner.
>
> You may also find the following examples helpful:
>
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/streaming_wordcount.py
> (streaming pipeline).
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
>  (batch
> pipeline)
>
> https://stackoverflow.com/questions/46854167/dataflow-streaming-using-python-sdk-transform-for-pubsub-messages-to-bigquery-o
> .
>
> On Mon, Jul 15, 2019 at 2:12 AM Matthew Darwin <
> matthew.dar...@carfinance247.co.uk> wrote:
>
>> Hi Valentyn,
>>
>> Thank you for your reply. I'm already using the with_attributes=True
>> option, however this returns the attributes property of the JSON, i.e :-
>>
>> {
>>>       "attributes": {
>>>         "source": "python"
>>>       }
>>>
>>
>>
>> My pipeline currently looks like this (id_label is commented out when
>> running directly, as it causes a not implemented error):-
>>
>> messages = (p
>> | 'Read From Pub Sub' >> ReadFromPubSub(subscription
>> =known_args.input_subscription,with_attributes=True)
>> #,id_label='message_id')
>> | 'Parse JSON' >> beam.Map(format_message_element)
>>
>> My function to parse the message looks like this:-
>>
>> def format_message_element(message, timestamp=beam.DoFn.TimestampParam):
>> messagedict = json.loads(message.data)
>> rownumber = messagedict['rownumber']
>> fullmessage = {'data' : json.dumps(message.data),
>> 'rownumber' : int(rownumber),
>> 'attributes' : json.dumps(message.attributes),
>> 'timestamp' : float(timestamp)}
>>
>> logging.info(message.attributes)
>> logging.info(message)
>>
>> return (rownumber, fullmessage)
>>
>> I'm aware there are the id_label and with_timestamp parameters for the
>> ReadFromPubSub method, however, these don't seem to work with the direct
>> runner, as per
>> https://issues.apache.org/jira/browse/BEAM-4275?jql=text%20~%20%22python%20id_label%22
>> which makes testing somewhat difficult.
>>
>> My full code is attached, when running above 2.9.0 of the SDK I can't get
>> passed the windowing function, due to an issue that appears related to this
>> https://stackoverflow.com/questions/54745869/how-to-create-a-dataflow-pipeline-from-pub-sub-to-gcs-in-python
>> and this
>> https://stackoverflow.com/questions/55109403/apache-beam-python-sdk-upgrade-issue
>> as I was receiving the following error when running on 2.12.0:
>>
>> Cannot convert GlobalWindow to
>> apache_beam.utils.windowed_value._IntervalWindowBase [while running
>> 'generatedPtransform-150']
>>
>> On 2.9.0 when running on the local runner, I receive the following output
>> from the logging.info calls in format_message_element:
>>
>> INFO:root:{u'source': u'python'}
>> INFO:root:PubsubMessage({"rownumber": 1}, {u'source': u'python'})
>>
>> I was expecting the messageId and publishTime as part of the object
>> returned; but as you can see there's nothing there for those attributes.
>>
>> (The code does not quite map correctly to the BigQuery table so it fails
>> inserts at that point, which I'm currently trying to resolve!)
>>
>> Kind regards,
>>
>> Matthew
>>
>>
>>
>> On Fri, 2019-07-12 at 09:13 -0700, Valentyn Tymofieiev wrote:
>>
>> *This message originated from outside your organization*
>> ------------------------------
>> Hi Matthew,
>>
>> Welcome to Beam!
>>
>> Looking at Python PubSub IO API, you should be able to access id and
>> timestamp by setting `with_attributes=True` when using `ReadFromPubSub`
>> PTransform, see [1,2].
>>
>> [1]
>> https://github.com/apache/beam/blob/0fce2b88660f52dae638697e1472aa108c982ae6/sdks/python/apache_beam/io/gcp/pubsub.py#L61
>> [2]
>> https://github.com/apache/beam/blob/0fce2b88660f52dae638697e1472aa108c982ae6/sdks/python/apache_beam/io/gcp/pubsub.py#L138
>>
>> On Fri, Jul 12, 2019 at 1:36 AM Matthew Darwin <
>> matthew.dar...@carfinance247.co.uk> wrote:
>>
>> Good morning,
>>
>> I'm very new to Beam, and pretty new to Python so please first accept my
>> apologies for any obvious misconceptions/mistakes in the following.
>>
>> I am currently trying to develop a sample pipeline in Python to pull
>> messages from Pub/Sub and then write them to either files in cloud storage
>> or to BigQuery. The ultimate goal will be to utilise the pipeline for real
>> time streaming of event data to BigQuery (with various transformations) but
>> also to store the raw messages long term in files in cloud storage.
>>
>> At the moment, I'm simply trying to parse the message to get the PubSub
>> messageId and publishTime in order to be able to write them into the
>> output. The json of my PubSub message looks like this:-
>>
>> [
>>   {
>>     "ackId":
>> "BCEhPjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRIICBQFfH1xU1t1Xl8aB1ENGXJ8Zyc_XxcIB0BTeFVaEQx6bVxXOFcMEHF8YXZpWhUIA0FTfXeq5cveluzJNksxIbvE8KxfeqqmgfhiZho9XxJLLD5-PT5FQV5AEkw2C0RJUytDCypYEU4",
>>     "message": {
>>       "attributes": {
>>         "source": "python"
>>       },
>>       "data": "eyJyb3dudW1iZXIiOiAyfQ==",
>>       "messageId": "619310330691403",
>>       "publishTime": "2019-07-12T08:27:58.522Z"
>>     }
>>   }
>> ]
>> According to the documentation
>> <https://beam.apache.org/releases/pydoc/2.13.0/apache_beam.io.gcp.pubsub.html>
>> the PubSub message payload returns the *data* and *attributes*
>> properties; is there simply no way of retrieving the messageId and
>> publishTime, or are these exposed somewhere else? If not, will the
>> inclusion of these be in the roadmap, and are they available if using Java
>> (I have zero Java experience hence why reaching for Python first).
>>
>> Kind regards,
>>
>> Matthew
>>
>>
>>
>>
>> ---------- Forwarded message ----------
>> From: Domain postMaster address <postmas...@carfinance247.co.uk>
>> To: "user@beam.apache.org" <user@beam.apache.org>
>> Cc:
>> Bcc:
>> Date: Mon, 15 Jul 2019 09:12:15 +0000
>> Subject: ApacheBeamDataflow-To-BigQuery2_py was removed from this message
>>
>> [image: Logo]
>>   We removed a file from this message
>>
>> Your organization's email policy doesn't permit this type of file. If you
>> need it, please contact your administrator.
>>
>>
>> File Details
>>
>> *ApacheBeamDataflow-To-BigQuery2.py* (6559 bytes)
>>
>>
>>   © 2003 - 2019 Mimecast Services Limited.
>>
>>
>>
>

Reply via email to