Re: [python SDK] Returning Pub/Sub message_id and timestamp

2019-07-19 Thread Valentyn Tymofieiev
Also, see
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/leader_board.py
which
involves both PubSub and Bigquery IOs.

On Fri, Jul 19, 2019 at 12:31 PM Pablo Estrada  wrote:

> Beam 2.14.0 will include support for writing files in the fileio module
> (the support will include GCS, local files, HDFS). It will also support
> streaming. The transform is still marked as experimental, and is likely to
> receive improvements - but you can check it out for your pipelines, and see
> if it helps you : )
> Best
> -P.
>
> On Fri, Jul 19, 2019 at 12:24 PM Valentyn Tymofieiev 
> wrote:
>
>> As of today, Beam Python streaming does not support writing to GCS yet,
>> which  explains
>> https://stackoverflow.com/questions/54745869/how-to-create-a-dataflow-pipeline-from-pub-sub-to-gcs-in-python
>>  .
>>
>> You are right - id_label and timestamp_attribute does not work on Direct
>> runner yet as per https://issues.apache.org/jira/browse/BEAM-4275, I
>> checked with a few folks and that seems to be the current status, but you
>> can still give them a try on Dataflow runner.
>>
>> You may also find the following examples helpful:
>>
>>
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/streaming_wordcount.py
>> (streaming pipeline).
>>
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
>>  (batch
>> pipeline)
>>
>> https://stackoverflow.com/questions/46854167/dataflow-streaming-using-python-sdk-transform-for-pubsub-messages-to-bigquery-o
>> .
>>
>> On Mon, Jul 15, 2019 at 2:12 AM Matthew Darwin <
>> matthew.dar...@carfinance247.co.uk> wrote:
>>
>>> Hi Valentyn,
>>>
>>> Thank you for your reply. I'm already using the with_attributes=True
>>> option, however this returns the attributes property of the JSON, i.e :-
>>>
>>> {
   "attributes": {
 "source": "python"
   }

>>>
>>>
>>> My pipeline currently looks like this (id_label is commented out when
>>> running directly, as it causes a not implemented error):-
>>>
>>> messages = (p
>>> | 'Read From Pub Sub' >> ReadFromPubSub(subscription
>>> =known_args.input_subscription,with_attributes=True)
>>> #,id_label='message_id')
>>> | 'Parse JSON' >> beam.Map(format_message_element)
>>>
>>> My function to parse the message looks like this:-
>>>
>>> def format_message_element(message, timestamp=beam.DoFn.TimestampParam):
>>> messagedict = json.loads(message.data)
>>> rownumber = messagedict['rownumber']
>>> fullmessage = {'data' : json.dumps(message.data),
>>> 'rownumber' : int(rownumber),
>>> 'attributes' : json.dumps(message.attributes),
>>> 'timestamp' : float(timestamp)}
>>>
>>> logging.info(message.attributes)
>>> logging.info(message)
>>>
>>> return (rownumber, fullmessage)
>>>
>>> I'm aware there are the id_label and with_timestamp parameters for the
>>> ReadFromPubSub method, however, these don't seem to work with the direct
>>> runner, as per
>>> https://issues.apache.org/jira/browse/BEAM-4275?jql=text%20~%20%22python%20id_label%22
>>> which makes testing somewhat difficult.
>>>
>>> My full code is attached, when running above 2.9.0 of the SDK I can't
>>> get passed the windowing function, due to an issue that appears related to
>>> this
>>> https://stackoverflow.com/questions/54745869/how-to-create-a-dataflow-pipeline-from-pub-sub-to-gcs-in-python
>>> and this
>>> https://stackoverflow.com/questions/55109403/apache-beam-python-sdk-upgrade-issue
>>> as I was receiving the following error when running on 2.12.0:
>>>
>>> Cannot convert GlobalWindow to
>>> apache_beam.utils.windowed_value._IntervalWindowBase [while running
>>> 'generatedPtransform-150']
>>>
>>> On 2.9.0 when running on the local runner, I receive the following
>>> output from the logging.info calls in format_message_element:
>>>
>>> INFO:root:{u'source': u'python'}
>>> INFO:root:PubsubMessage({"rownumber": 1}, {u'source': u'python'})
>>>
>>> I was expecting the messageId and publishTime as part of the object
>>> returned; but as you can see there's nothing there for those attributes.
>>>
>>> (The code does not quite map correctly to the BigQuery table so it fails
>>> inserts at that point, which I'm currently trying to resolve!)
>>>
>>> Kind regards,
>>>
>>> Matthew
>>>
>>>
>>>
>>> On Fri, 2019-07-12 at 09:13 -0700, Valentyn Tymofieiev wrote:
>>>
>>> *This message originated from outside your organization*
>>> --
>>> Hi Matthew,
>>>
>>> Welcome to Beam!
>>>
>>> Looking at Python PubSub IO API, you should be able to access id and
>>> timestamp by setting `with_attributes=True` when using `ReadFromPubSub`
>>> PTransform, see [1,2].
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/0fce2b88660f52dae638697e1472aa108c982ae6/sdks/python/apache_beam/io/gcp/pubsub.py#L61
>>> [2]
>>> https://github.com/apache/beam/blob/0fce2b88660f52dae638697e1472aa108c982ae6/sdks/python/apache_beam/io/gcp/pubsub.py#L138
>>>
>>> On Fri, Jul 12, 2019 

Re: [python SDK] Returning Pub/Sub message_id and timestamp

2019-07-19 Thread Pablo Estrada
Beam 2.14.0 will include support for writing files in the fileio module
(the support will include GCS, local files, HDFS). It will also support
streaming. The transform is still marked as experimental, and is likely to
receive improvements - but you can check it out for your pipelines, and see
if it helps you : )
Best
-P.

On Fri, Jul 19, 2019 at 12:24 PM Valentyn Tymofieiev 
wrote:

> As of today, Beam Python streaming does not support writing to GCS yet,
> which  explains
> https://stackoverflow.com/questions/54745869/how-to-create-a-dataflow-pipeline-from-pub-sub-to-gcs-in-python
>  .
>
> You are right - id_label and timestamp_attribute does not work on Direct
> runner yet as per https://issues.apache.org/jira/browse/BEAM-4275, I
> checked with a few folks and that seems to be the current status, but you
> can still give them a try on Dataflow runner.
>
> You may also find the following examples helpful:
>
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/streaming_wordcount.py
> (streaming pipeline).
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
>  (batch
> pipeline)
>
> https://stackoverflow.com/questions/46854167/dataflow-streaming-using-python-sdk-transform-for-pubsub-messages-to-bigquery-o
> .
>
> On Mon, Jul 15, 2019 at 2:12 AM Matthew Darwin <
> matthew.dar...@carfinance247.co.uk> wrote:
>
>> Hi Valentyn,
>>
>> Thank you for your reply. I'm already using the with_attributes=True
>> option, however this returns the attributes property of the JSON, i.e :-
>>
>> {
>>>   "attributes": {
>>> "source": "python"
>>>   }
>>>
>>
>>
>> My pipeline currently looks like this (id_label is commented out when
>> running directly, as it causes a not implemented error):-
>>
>> messages = (p
>> | 'Read From Pub Sub' >> ReadFromPubSub(subscription
>> =known_args.input_subscription,with_attributes=True)
>> #,id_label='message_id')
>> | 'Parse JSON' >> beam.Map(format_message_element)
>>
>> My function to parse the message looks like this:-
>>
>> def format_message_element(message, timestamp=beam.DoFn.TimestampParam):
>> messagedict = json.loads(message.data)
>> rownumber = messagedict['rownumber']
>> fullmessage = {'data' : json.dumps(message.data),
>> 'rownumber' : int(rownumber),
>> 'attributes' : json.dumps(message.attributes),
>> 'timestamp' : float(timestamp)}
>>
>> logging.info(message.attributes)
>> logging.info(message)
>>
>> return (rownumber, fullmessage)
>>
>> I'm aware there are the id_label and with_timestamp parameters for the
>> ReadFromPubSub method, however, these don't seem to work with the direct
>> runner, as per
>> https://issues.apache.org/jira/browse/BEAM-4275?jql=text%20~%20%22python%20id_label%22
>> which makes testing somewhat difficult.
>>
>> My full code is attached, when running above 2.9.0 of the SDK I can't get
>> passed the windowing function, due to an issue that appears related to this
>> https://stackoverflow.com/questions/54745869/how-to-create-a-dataflow-pipeline-from-pub-sub-to-gcs-in-python
>> and this
>> https://stackoverflow.com/questions/55109403/apache-beam-python-sdk-upgrade-issue
>> as I was receiving the following error when running on 2.12.0:
>>
>> Cannot convert GlobalWindow to
>> apache_beam.utils.windowed_value._IntervalWindowBase [while running
>> 'generatedPtransform-150']
>>
>> On 2.9.0 when running on the local runner, I receive the following output
>> from the logging.info calls in format_message_element:
>>
>> INFO:root:{u'source': u'python'}
>> INFO:root:PubsubMessage({"rownumber": 1}, {u'source': u'python'})
>>
>> I was expecting the messageId and publishTime as part of the object
>> returned; but as you can see there's nothing there for those attributes.
>>
>> (The code does not quite map correctly to the BigQuery table so it fails
>> inserts at that point, which I'm currently trying to resolve!)
>>
>> Kind regards,
>>
>> Matthew
>>
>>
>>
>> On Fri, 2019-07-12 at 09:13 -0700, Valentyn Tymofieiev wrote:
>>
>> *This message originated from outside your organization*
>> --
>> Hi Matthew,
>>
>> Welcome to Beam!
>>
>> Looking at Python PubSub IO API, you should be able to access id and
>> timestamp by setting `with_attributes=True` when using `ReadFromPubSub`
>> PTransform, see [1,2].
>>
>> [1]
>> https://github.com/apache/beam/blob/0fce2b88660f52dae638697e1472aa108c982ae6/sdks/python/apache_beam/io/gcp/pubsub.py#L61
>> [2]
>> https://github.com/apache/beam/blob/0fce2b88660f52dae638697e1472aa108c982ae6/sdks/python/apache_beam/io/gcp/pubsub.py#L138
>>
>> On Fri, Jul 12, 2019 at 1:36 AM Matthew Darwin <
>> matthew.dar...@carfinance247.co.uk> wrote:
>>
>> Good morning,
>>
>> I'm very new to Beam, and pretty new to Python so please first accept my
>> apologies for any obvious misconceptions/mistakes in the following.
>>
>> I am currently trying to develop a sample pipeline in Python to pull
>> messages from Pub/Sub and then write 

Re: [python SDK] Returning Pub/Sub message_id and timestamp

2019-07-19 Thread Valentyn Tymofieiev
As of today, Beam Python streaming does not support writing to GCS yet,
which  explains
https://stackoverflow.com/questions/54745869/how-to-create-a-dataflow-pipeline-from-pub-sub-to-gcs-in-python
 .

You are right - id_label and timestamp_attribute does not work on Direct
runner yet as per https://issues.apache.org/jira/browse/BEAM-4275, I
checked with a few folks and that seems to be the current status, but you
can still give them a try on Dataflow runner.

You may also find the following examples helpful:

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/streaming_wordcount.py
(streaming pipeline).
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
(batch
pipeline)
https://stackoverflow.com/questions/46854167/dataflow-streaming-using-python-sdk-transform-for-pubsub-messages-to-bigquery-o
.

On Mon, Jul 15, 2019 at 2:12 AM Matthew Darwin <
matthew.dar...@carfinance247.co.uk> wrote:

> Hi Valentyn,
>
> Thank you for your reply. I'm already using the with_attributes=True
> option, however this returns the attributes property of the JSON, i.e :-
>
> {
>>   "attributes": {
>> "source": "python"
>>   }
>>
>
>
> My pipeline currently looks like this (id_label is commented out when
> running directly, as it causes a not implemented error):-
>
> messages = (p
> | 'Read From Pub Sub' >> ReadFromPubSub(subscription
> =known_args.input_subscription,with_attributes=True)
> #,id_label='message_id')
> | 'Parse JSON' >> beam.Map(format_message_element)
>
> My function to parse the message looks like this:-
>
> def format_message_element(message, timestamp=beam.DoFn.TimestampParam):
> messagedict = json.loads(message.data)
> rownumber = messagedict['rownumber']
> fullmessage = {'data' : json.dumps(message.data),
> 'rownumber' : int(rownumber),
> 'attributes' : json.dumps(message.attributes),
> 'timestamp' : float(timestamp)}
>
> logging.info(message.attributes)
> logging.info(message)
>
> return (rownumber, fullmessage)
>
> I'm aware there are the id_label and with_timestamp parameters for the
> ReadFromPubSub method, however, these don't seem to work with the direct
> runner, as per
> https://issues.apache.org/jira/browse/BEAM-4275?jql=text%20~%20%22python%20id_label%22
> which makes testing somewhat difficult.
>
> My full code is attached, when running above 2.9.0 of the SDK I can't get
> passed the windowing function, due to an issue that appears related to this
> https://stackoverflow.com/questions/54745869/how-to-create-a-dataflow-pipeline-from-pub-sub-to-gcs-in-python
> and this
> https://stackoverflow.com/questions/55109403/apache-beam-python-sdk-upgrade-issue
> as I was receiving the following error when running on 2.12.0:
>
> Cannot convert GlobalWindow to
> apache_beam.utils.windowed_value._IntervalWindowBase [while running
> 'generatedPtransform-150']
>
> On 2.9.0 when running on the local runner, I receive the following output
> from the logging.info calls in format_message_element:
>
> INFO:root:{u'source': u'python'}
> INFO:root:PubsubMessage({"rownumber": 1}, {u'source': u'python'})
>
> I was expecting the messageId and publishTime as part of the object
> returned; but as you can see there's nothing there for those attributes.
>
> (The code does not quite map correctly to the BigQuery table so it fails
> inserts at that point, which I'm currently trying to resolve!)
>
> Kind regards,
>
> Matthew
>
>
>
> On Fri, 2019-07-12 at 09:13 -0700, Valentyn Tymofieiev wrote:
>
> *This message originated from outside your organization*
> --
> Hi Matthew,
>
> Welcome to Beam!
>
> Looking at Python PubSub IO API, you should be able to access id and
> timestamp by setting `with_attributes=True` when using `ReadFromPubSub`
> PTransform, see [1,2].
>
> [1]
> https://github.com/apache/beam/blob/0fce2b88660f52dae638697e1472aa108c982ae6/sdks/python/apache_beam/io/gcp/pubsub.py#L61
> [2]
> https://github.com/apache/beam/blob/0fce2b88660f52dae638697e1472aa108c982ae6/sdks/python/apache_beam/io/gcp/pubsub.py#L138
>
> On Fri, Jul 12, 2019 at 1:36 AM Matthew Darwin <
> matthew.dar...@carfinance247.co.uk> wrote:
>
> Good morning,
>
> I'm very new to Beam, and pretty new to Python so please first accept my
> apologies for any obvious misconceptions/mistakes in the following.
>
> I am currently trying to develop a sample pipeline in Python to pull
> messages from Pub/Sub and then write them to either files in cloud storage
> or to BigQuery. The ultimate goal will be to utilise the pipeline for real
> time streaming of event data to BigQuery (with various transformations) but
> also to store the raw messages long term in files in cloud storage.
>
> At the moment, I'm simply trying to parse the message to get the PubSub
> messageId and publishTime in order to be able to write them into the
> output. The json of my PubSub message looks like this:-
>
> [
>   {
> "ackId":
> 

Re: Beam release 2.5.0 tag SNAPSHOT version

2019-07-19 Thread Kenneth Knowles
Good catch.

The release 2.5.0 was built with gradle, so that pom is left over. The
gradle release plugin does not edit poms, so it did not change that.
Instead, the pom is generated and you can find them on maven central like
https://repo1.maven.org/maven2/org/apache/beam/beam-runners-direct-java/2.5.0/beam-runners-direct-java-2.5.0.pom.
Today
the pom has been removed and it is generated at run time.

Since you are digging into details, you may also be interested in the RC
vote and verification thread:
https://lists.apache.org/thread.html/dac093378fb27331c5d9d86a3bd03397f7e186d482f0fc8c8ef88feb@%3Cdev.beam.apache.org%3E

To answer your original question, I do believe the tag v2.5.0 and
v2.5.0-RC2 point to the commit where the released artifacts were built.

Kenn

On Wed, Jul 17, 2019 at 10:54 PM Abdul Qadeer  wrote:

> Hi Kenneth!
>
> But even the tag v2.5.0 points to SNAPSHOT version for Maven:
> https://github.com/apache/beam/blob/v2.5.0/pom.xml#L37
>
> The same file in v2.4.0:
> https://github.com/apache/beam/blob/v2.4.0/pom.xml#L37
>
> I am looking to publish v2.5.0 artifacts via maven to a private
> artifactory, and without right versions it will publish it as a SNAPSHOT
> version. Shall I raise a PR for this?
>
> On Wed, Jul 17, 2019 at 3:41 PM Kenneth Knowles  wrote:
>
>> What you have pointed to is the tip of the release-2.5.0 branch. The
>> gradle release plugin copies the maven release plugin. So it has rolled
>> back the version change so the branch is always at a snapshot version.
>>
>> The commit before that is tag v2.5.0 and that is the final tag. Here is
>> the gradle properties:
>> https://github.com/apache/beam/blob/v2.5.0/gradle.properties
>>
>> I do believe this should be 2.5.0, not 2.5.0-RC2. But anyhow I think that
>> is the commit that was used to build 2.5.0.
>>
>> Kenn
>>
>> On Wed, Jul 17, 2019 at 3:36 PM Kenneth Knowles  wrote:
>>
>>> I take that back - misclicked on 0.5.0 (which has a correct tag).
>>>
>>> On Wed, Jul 17, 2019 at 3:34 PM Kenneth Knowles  wrote:
>>>
 Looks like it is this:
 https://github.com/apache/beam/tree/4838ae16c172252bc0a15e3a984e085f82e25c2d

 I believe the release manager created the tag to point to the tip of
 the release branch after the maven release plugin rolled that change back
 (this is how the maven release plugin works since it does not utilize git's
 branching model).

 I will fix the tag.

 Kenn

 On Wed, Jul 17, 2019 at 3:32 PM Abdul Qadeer 
 wrote:

> Hi!
>
> Why is v2.5.0 tag for Beam 2.5.0 release SNAPSHOT version here?
> https://github.com/apache/beam/blob/v2.5.0/pom.xml#L37
> https://github.com/apache/beam/blob/release-2.5.0/gradle.properties#L25
>
> Please let me know where to find final release 2.5.0 commit.
>



How to deal with failed Checkpoint? What is current behavior for subsequent checkpoints?

2019-07-19 Thread Ken Barr
Reading the below two statements I conclude that 
CheckpointMark.finalizeCheckpoint() will be called in order, unless there is a 
failure.
What happens in a failure?
What happens to subsequent checkpoints in the case of a checkpoint failure?
How do I prevent event re-ordering in the case of a checkpoint failure?

There is no rewind method and I cannot find a way to tell the UnboundedReader 
to rewind.   If I look at the pubsub UnboundedSource it seems to try and deal 
with the condition where the finalize itself fails, but not a failure elsewhere 
in the Checkpointing.  The only time a rewind, nackAll() is called is when a 
new reader is created.  If I have an UnboundedSource that is capable of 
rewinding to the last valid checkpoint and replaying events, how could I 
indicate to it to do so in the case of a Checkpoint failure that occurs outside 
the UnboundedSource?

https://beam.apache.org/releases/javadoc/2.8.0/org/apache/beam/sdk/io/UnboundedSource.CheckpointMark.html
"In the absence of failures, all checkpoints will be finalized and they will be 
finalized in the same order they were taken from the 
UnboundedSource.UnboundedReader."
"It is possible for a checkpoint to be taken but this method never called. This 
method will never be called if the checkpoint could not be committed, and other 
failures may cause this method to never be called."



Re: Caused by: java.lang.Exception: The user defined 'open()' method caused an exception: java.lang.NoClassDefFoundError: Could not initialize class com.google.common.io.BaseEncoding

2019-07-19 Thread Ryan Skraba
Hello!  These are the "fun" problems to track down.

I believe the GoogleCredentials class (0.12.0 in Beam, if that's where
it's coming from) brings in an unvendored/unshaded dependency on
guava-20.x.   BaseEncoding was introduced in guava-14.x

Someplace in your job, there's probably an older version of guava
taking precedence... I believe Flink uses a shaded version of guava,
so that shouldn't be the problem.

Preinstalled libraries on EMR are all over the place, from guava-11.x
brought in by hadoop 2.8.5 (and guava-14.x by Spark 2.4.3 but that
shouldn't be relevant).

Is it possible to check the classpath for your Flink job on EMR?  That
would be a good starting point.

Good luck!  Ryan

On Thu, Jul 18, 2019 at 3:27 PM jitendra sharma
 wrote:
>
> Hi,
>
> i was running the apache beam pipeline in my local which was running well and 
> good. But when i try to run from emr cluster, i am getting the below error. 
> Can you please provide some suggestions on this?
>
> Caused by: java.lang.Exception: The user defined 'open()' method caused an 
> exception: java.lang.NoClassDefFoundError: Could not initialize class 
> com.google.common.io.BaseEncoding
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.beam.sdk.util.UserCodeException: 
> java.lang.NoClassDefFoundError: Could not initialize class 
> com.google.common.io.BaseEncoding
> at 
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
> at 
> io.condenast.de.ProcessDestinationSegmentsFn$DoFnInvoker.invokeSetup(Unknown 
> Source)
> at 
> org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.open(FlinkDoFnFunction.java:137)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:494)
> ... 3 more
> Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
> com.google.common.io.BaseEncoding
> at com.google.api.client.util.Base64.decodeBase64(Base64.java:101)
> at com.google.api.client.util.PemReader.readNextSection(PemReader.java:99)
> at 
> com.google.api.client.util.PemReader.readFirstSectionAndClose(PemReader.java:128)
> at 
> com.google.auth.oauth2.ServiceAccountCredentials.privateKeyFromPkcs8(ServiceAccountCredentials.java:255)
> at 
> com.google.auth.oauth2.ServiceAccountCredentials.fromPkcs8(ServiceAccountCredentials.java:245)
> at 
> com.google.auth.oauth2.ServiceAccountCredentials.fromJson(ServiceAccountCredentials.java:169)
> at 
> com.google.auth.oauth2.GoogleCredentials.fromStream(GoogleCredentials.java:162)
> at 
> com.google.auth.oauth2.GoogleCredentials.fromStream(GoogleCredentials.java:129)
> at 
> io.condenast.de.ProcessDestinationSegmentsFn.setup(ProcessDestinationSegmentsFn.java:41)
>
> Regards,
> Jitendra Sharma