Re: Contributor permission for Jira tickets

2021-03-25 Thread Fabien Caylus
Hi !

I'm sorry for this second message. But I realized that my first one was
detected as spam by gmail, because I used the Pony Mail interface to send
it ...
So I believed it may have been detected as spam for others, hence the
second message !

Thanks !

Le jeu. 18 mars 2021 à 20:11, Fabien Caylus  a
écrit :

> Hello everyone !
>
> I found and created an issue in Jira, and would like to assign it to
> myself so I can start working on it.
> As the contribution guide suggests, can I be added to the contributors
> list ?
>
> My Jira username is: fcaylus
>
> Thanks !
>


Re: Contributor permission for Jira tickets

2021-03-25 Thread Ahmet Altay
Done. Welcome.

On Thu, Mar 25, 2021 at 3:39 AM Fabien Caylus 
wrote:

> Hi !
>
> I'm sorry for this second message. But I realized that my first one was
> detected as spam by gmail, because I used the Pony Mail interface to send
> it ...
> So I believed it may have been detected as spam for others, hence the
> second message !
>
> Thanks !
>
> Le jeu. 18 mars 2021 à 20:11, Fabien Caylus  a
> écrit :
>
>> Hello everyone !
>>
>> I found and created an issue in Jira, and would like to assign it to
>> myself so I can start working on it.
>> As the contribution guide suggests, can I be added to the contributors
>> list ?
>>
>> My Jira username is: fcaylus
>>
>> Thanks !
>>
>


Re: BEAM-3713: Moving from nose to pytest

2021-03-25 Thread Udi Meiri
Hi Benjamin,

AFAIK nose is only used for integration tests (unit tests were converted to
pytest a while back).
These ITs should all be running periodically (except maybe the release
related ones?).

I would start with selecting one of the Jenkins jobs and converting the ITs
in it to pytest.
Good place to start:
https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/
I would prioritize converting the Python jobs listed here:
https://github.com/apache/beam/blob/master/.github/PULL_REQUEST_TEMPLATE.md

There's a fairly old abandoned PR with some ideas:
https://github.com/apache/beam/pull/7949/files
Have a look at:
sdks/python/scripts/run_integration_test.sh
sdks/python/pytest.ini
sdks/python/conftest.py

My idea in that PR was to replace the nose @attr('IT') decorators with 1 or
more:
@pytest.mark.it_postcommit,
@pytest.mark.no_direct,
etc.
These decorators tell nose/pytest which tests to run.
So if I wanted to run post-commit tests on direct runner I would use this
pytest flag:
"-m 'it_postcommit and not no_direct'".


On Wed, Mar 24, 2021 at 5:41 PM Ahmet Altay  wrote:

> All PRs look either merged or closed.
>
> +Udi Meiri  might have more information about the
> remaining work.
>
> On Wed, Mar 24, 2021 at 5:29 PM Benjamin Gonzalez Delgado <
> benjamin.gonza...@wizeline.com> wrote:
>
>> Hi team,
>> I am planning to work in BEAM-3713
>> , but I see there are
>> PRs related to the task.
>> Could someone guide me on the work that remains missing regarding the
>> migration from nose to pytest?
>> Any guidance on this would be appreciated.
>>
>> Thanks!
>> Benjamin
>>
>>
>>
>>
>>
>>
>>
>>
>> *This email and its contents (including any attachments) are being sent
>> toyou on the condition of confidentiality and may be protected by
>> legalprivilege. Access to this email by anyone other than the intended
>> recipientis unauthorized. If you are not the intended recipient, please
>> immediatelynotify the sender by replying to this message and delete the
>> materialimmediately from your system. Any further use, dissemination,
>> distributionor reproduction of this email is strictly prohibited. Further,
>> norepresentation is made with respect to any content contained in this
>> email.*
>
>


smime.p7s
Description: S/MIME Cryptographic Signature


Re: BEAM-3304 - Go Trigger RuntimeError question

2021-03-25 Thread Robert Burke
>From the outset, I agree with you that there's something the Go SDK didn't
expect with the stream of bytes it received. The non-global Window handling
in the Go SDK is grossly undertested to begin with, other than the
windowed_wordcount example (which I've run maybe once or twice).  As an
aside, this is something we should change now that we have an improved
integration test set up to validate things against the Python Portable ULR.

This one is rightly tricky, I'm going to type out my debugging process.
Search for *tl;dr;* to skip to the end for the result of the debugging.

The error messages can always be improved, but it's been a while since I've
seen an encoding error like this.
First lets eliminate that the varint coder is doing something wrong.

The important part is the bit after the plan:
```
caused by:
stream value decode failed
caused by:
invalid varintz encoding for: []
```

So I search the github beam repo for that error message "invalid varintz
encoding" [1] finding one location the varintz decoder [2].
The '[]' at the end of the error message is the byte array it was trying to
decode, which means apparently it received a 0 length []byte
after reading, and couldn't handle it.

It put the []byte into `binary.Varint` which if we look to the imports, it
comes from the "encoding/binary" package [3]. As per the Varint
documentation [4], when it returns a 0, it means the []byte buffer was too
small (in this case 0) so the error is legitimate, and the returned value
is invalid.

Because I know this varint should have been encoded by beam (knowing how
the windowed_wordcount.go example works), I look
to the encoding function and see that it uses `binary.PutVarint` [5] to
encode the value. Looking at the documentation for PutVarint [6] there's an
example which we can modify to also print out the "n" length written to the
buffer, confirming that even for the natural case for a 0 length byte buffer
a 0 value, always encodes to at least 1 byte.

OK the varint coder did everything it expected. So now the question becomes
"Why is there a 0 length prefix where the integer value is expected?"
What's different here?
The printed out plan shows the hydrated ProcessBundleDescriptor as the Go
SDK interpreted it. The ordering is reversed, but the first and last lines
are the DataSource and
DataSink, and those are the only places where a Coder is going to be used.
They're the boundaries for the data entering and leaving the Go SDK
respectively, so where Decoding
and encoding is happening. Let's look at the plan line for DataSource.

1: DataSource[S[CombinePerKey/Group/Read@localhost:63851], out]
Coder:W;c8_windowed>!IWC Out:7

The first part is the index (a 1) is just where it is in the plan listing,
and is used for more complex graphs, it's referred to by the Out lines to
which PTransform(s) this transform outputs.
The next part says it's a DataSource. This line comes from the
DataSource's String method [7]. It's outputting the Stream ID
"S[CombinePerKey/Group/Read@localhost:63851]" the runner gave it to access
the right data over the data channel, and the name "out" which is the key
used to identify this bundle in particular (used in progress reporting.)
This is followed by the Coder, and then what the datasource passes it's
output to. In this case, it's passing it to "7:
MergeAccumulators[stats.sumIntFn]".

The Coder is the important part for this bug hunt. I'm not 100% satisfied
with the format that's been ended up with here, but it is what it is. It's
a result of the general coder String() method [8].
What this coder is saying is that it's receiving CoGBK values,
wrapped in an Interval Window Coder.
Loosely, the ;c# (and that c8_windowed) values are the ids into the job's
pipeline proto coders map. It prints out it's kind, the coder id, and any
component coders it might have.

This seems all in order, as it's a windowed job. I'd expect this for
unmodified wordcounts as well. It does give us the next place to look
though, Coder handling! The Beam standard coders are defined in the
beam_runner_api.proto [9]. We're about to become good friends with the
coding formats listed there.

The DataSource creates the window and value coders separately at the top of
it's Process method [10], and then does special handling if the component
of the window is a CoGBK coder. Then the per element processing begins. The
Beam Datachannel at this point is represented as a stream of bytes (using
Go's  io.Reader interface) Each element for this Datasource represents a
windowed value. The windowed values are the Key, and all the values using
that key passed to the GBK.

Aside: The special CoGBK handling is because Beam doesn't have a first
class notion of a CoGBK coder or a GBK coder, it's represented with KVs and
Iterables.
Simple GBKs are represented by a KV> coder, and it's
the Iterable that's the
tricky portion, as the Go SDK doesn't provide a first class notion of an
unbounded iterable for users to pass around. It

Beam College Starts in 2 weeks!

2021-03-25 Thread Mara Ruvalcaba

**

*Hi Apache Beam Community, *

*

We invite you to join Beam College and improve your data analytics 
skills. Beam College provides free training to help supercharge your 
data processing skills on Apache Beam, where you will learn how to 
improve your data and stream processing pipelines.



The 5 day training is highly customizable, covering from basic concepts 
to advanced features and best practices, so you can sign up and drop-in 
based on topics of your interest and needs.



 Some of the topics we’ll cover:


 *

   Introduction to the Data processing ecosystem


 *

   Advanced distributed data processing with Apache Beam


 *

   Features to scale and productionalize your business case


 *

   Strategies for performance and cost optimization


 *

   Best practices for debugging Beam pipelines


Check out the full curriculum at https://beamcollege.dev/all-courses/ 




   Meet our speakers:

**

We will have an amazing lineup of speakers, 
https://beamcollege.dev/instructors/ 


**


 Register now for free!
 https://beamcollege.dev/forms-enrollment-workshop1/
 



*

--
Mara Ruvalcaba
COO, SG Software Guru & Nearshore Link
USA: 512 296 2884
MX: 55 5239 5502



Re: Write to multiple IOs in linear fashion

2021-03-25 Thread Alexey Romanenko
I think you are right, since "writer.close()”  contains a business logic, it 
must be moved to @FinishBundle. The same thing about DeleteFn.
I’ll create a Jira for that.

> On 25 Mar 2021, at 00:49, Kenneth Knowles  wrote:
> 
> Alex's idea sounds good and like what Vincent maybe implemented. I am just 
> reading really quickly so sorry if I missed something...
> 
> Checking out the code for the WriteFn I see a big problem:
> 
> @Setup
> public void setup() {
>   writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
> }
> 
> @ProcessElement
>   public void processElement(ProcessContext c) throws ExecutionException, 
> InterruptedException {
>   writer.mutate(c.element());
> }
> 
> @Teardown
> public void teardown() throws Exception {
>   writer.close();
>   writer = null;
> }
> 
> It is only in writer.close() that all async writes are waited on. This needs 
> to happen in @FinishBundle.
> 
> Did you discover this when implementing your own Cassandra.Write?
> 
> Until you have waited on the future, you should not output the element as 
> "has been written". And you cannot output from the @TearDown method which is 
> just for cleaning up resources.
> 
> Am I reading this wrong?
> 
> Kenn
> 
> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato  > wrote:
> How about a PCollection containing every element which was successfully 
> written?
> Basically the same things which were passed into it.
> 
> Then you could act on every element after its been successfully written to 
> the sink.
> 
> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw  > wrote:
> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía  > wrote:
> +dev
> 
> Since we all agree that we should return something different than
> PDone the real question is what should we return.
> 
> My proposal is that one returns a PCollection that consists, internally, 
> of something contentless like nulls. This is future compatible with returning 
> something more maningful based on the source source or write process itself, 
> but at least this would be followable. 
>  
> As a reminder we had a pretty interesting discussion about this
> already in the past but uniformization of our return values has not
> happened.
> This thread is worth reading for Vincent or anyone who wants to
> contribute Write transforms that return.
> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>  
> 
> 
> Yeah, we should go ahead and finally do something. 
>  
> 
> > Returning PDone is an anti-pattern that should be avoided, but changing it 
> > now would be backwards incompatible.
> 
> Periodic reminder most IOs are still Experimental so I suppose it is
> worth to the maintainers to judge if the upgrade to return someething
> different of PDone is worth, in that case we can deprecate and remove
> the previous signature in short time (2 releases was the average for
> previous cases).
> 
> 
> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
> mailto:aromanenko@gmail.com>> wrote:
> >
> > I thought that was said about returning a PCollection of write results as 
> > it’s done in other IOs (as I mentioned as examples) that have _additional_ 
> > write methods, like “withWriteResults()” etc, that return PTransform<…, 
> > PCollection>.
> > In this case, we keep backwards compatibility and just add new 
> > funtionality. Though, we need to follow the same pattern for user API and 
> > maybe even naming for this feature across different IOs (like we have for 
> > "readAll()” methods).
> >
> >  I agree that we have to avoid returning PDone for such cases.
> >
> > On 24 Mar 2021, at 20:05, Robert Bradshaw  > > wrote:
> >
> > Returning PDone is an anti-pattern that should be avoided, but changing it 
> > now would be backwards incompatible. PRs to add non-PDone returning 
> > variants (probably as another option to the builders) that compose well 
> > with Wait, etc. would be welcome.
> >
> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko  > > wrote:
> >>
> >> In this way, I think “Wait” PTransform should work for you but, as it was 
> >> mentioned before, it doesn’t work with PDone, only with PCollection as a 
> >> signal.
> >>
> >> Since you already adjusted your own writer for that, it would be great to 
> >> contribute it back to Beam in the way as it was done for other IOs (for 
> >> example, JdbcIO [1] or BigtableIO [2])
> >>
> >> In general, I think we need to have it for all IOs, at least to use with 
> >> “Wait” because this pattern it's quite often required.
> >>
> >> [1] 
> >> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/i

Re: Write to multiple IOs in linear fashion

2021-03-25 Thread Robert Bradshaw
On Wed, Mar 24, 2021 at 7:29 PM Vincent Marquez 
wrote:

>
> *~Vincent*
>
>
> On Wed, Mar 24, 2021 at 6:07 PM Kenneth Knowles  wrote:
>
>> The reason I was checking out the code is that sometimes a natural thing
>> to output would be a summary of what was written. So each chunk of writes
>> and the final chunk written in @FinishBundle. This is, for example, what
>> SQL engines do (output # of rows written).
>>
>> You could output both the summary and the full list of written elements
>> to different outputs, and users can choose. Outputs that are never consumed
>> should be very low or zero cost.n
>>
>>
> I like this approach.  I would much prefer two outputs (one of which is
> all elements written) to returning an existential/wildcard PCollection.
>

+1, this would work well too. Returning a PCollectionTuple is extensible
too, as one could add more (or better) outputs in the future without
changing the signature.


>
>
>
>> Kenn
>>
>> On Wed, Mar 24, 2021 at 5:36 PM Robert Bradshaw 
>> wrote:
>>
>>> Yeah, the entire input is not always what is needed, and can generally
>>> be achieved via
>>>
>>> input -> wait(side input of write) -> do something with the input
>>>
>>> Of course one could also do
>>>
>>> entire_input_as_output_of_wait -> MapTo(KV.of(null, null)) ->
>>> CombineGlobally(TrivialCombineFn)
>>>
>>> to reduce this to a more minimal set with at least one element per
>>> Window.
>>>
>>> The file writing operations emit the actual files that were written,
>>> which can be handy. My suggestion of PCollection was just so that we can
>>> emit something usable, and decide exactly what is the most useful is later.
>>>
>>>
>>> On Wed, Mar 24, 2021 at 5:30 PM Reuven Lax  wrote:
>>>
 I believe that the Wait transform turns this output into a side input,
 so outputting the input PCollection might be problematic.

 On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles 
 wrote:

> Alex's idea sounds good and like what Vincent maybe implemented. I am
> just reading really quickly so sorry if I missed something...
>
> Checking out the code for the WriteFn I see a big problem:
>
> @Setup
> public void setup() {
>   writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
> }
>
> @ProcessElement
>   public void processElement(ProcessContext c) throws
> ExecutionException, InterruptedException {
>   writer.mutate(c.element());
> }
>
> @Teardown
> public void teardown() throws Exception {
>   writer.close();
>   writer = null;
> }
>
> It is only in writer.close() that all async writes are waited on. This
> needs to happen in @FinishBundle.
>
> Did you discover this when implementing your own Cassandra.Write?
>
> Until you have waited on the future, you should not output the element
> as "has been written". And you cannot output from the @TearDown method
> which is just for cleaning up resources.
>
> Am I reading this wrong?
>
> Kenn
>
> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato  wrote:
>
>> How about a PCollection containing every element which was
>> successfully written?
>> Basically the same things which were passed into it.
>>
>> Then you could act on every element after its been successfully
>> written to the sink.
>>
>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw 
>> wrote:
>>
>>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía 
>>> wrote:
>>>
 +dev

 Since we all agree that we should return something different than
 PDone the real question is what should we return.

>>>
>>> My proposal is that one returns a PCollection that consists,
>>> internally, of something contentless like nulls. This is future 
>>> compatible
>>> with returning something more maningful based on the source source or 
>>> write
>>> process itself, but at least this would be followable.
>>>
>>>
 As a reminder we had a pretty interesting discussion about this
 already in the past but uniformization of our return values has not
 happened.
 This thread is worth reading for Vincent or anyone who wants to
 contribute Write transforms that return.

 https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>>
>>>
>>> Yeah, we should go ahead and finally do something.
>>>
>>>

 > Returning PDone is an anti-pattern that should be avoided, but
 changing it now would be backwards incompatible.

 Periodic reminder most IOs are still Experimental so I suppose it is
 worth to the maintainers to judge if the upgrade to return
 someething
 different of PDone is worth, in that case we can deprecate and
>

Re: Upgrading vendored gRPC from 1.26.0 to 1.36.0

2021-03-25 Thread Ismaël Mejía
Precommit is quite unstable in the last days, so worth to check if
something is wrong in the CI.

I have a question Kenn. Given that cherry picking this might be a bit
big as a change can we just reconsider cutting the 2.29.0 branch again
after the updated gRPC version use gets merged and mark the issues
already fixed for version 2.30.0 to version 2.29.0 ? Seems like an
easier upgrade path (and we will get some nice fixes/improvements like
official Spark 3 support for free on the release).

WDYT?


On Wed, Mar 24, 2021 at 8:06 PM Tomo Suzuki  wrote:
>
> Update: I observe that Java precommit check is unstable in the PR to upgrade 
> vendored gRPC (compared with an PR with an empty change). There's no constant 
> failures; sometimes it succeeds and other times it faces timeout and flaky 
> test failures.
>
> https://github.com/apache/beam/pull/14295#issuecomment-806071087
>
>
> On Mon, Mar 22, 2021 at 10:46 AM Tomo Suzuki  wrote:
>>
>> Thank you for the voting and I see the artifact available in Maven Central. 
>> I'll work on the PR to use the published artifact today.
>> https://search.maven.org/artifact/org.apache.beam/beam-vendor-grpc-1_36_0/0.1/jar
>>
>> On Tue, Mar 16, 2021 at 3:07 PM Kenneth Knowles  wrote:
>>>
>>> Update on this: there are some minor issues and then I'll send out the RC.
>>>
>>> I think this is worth blocking 2.29.0 release on, so I will do this first. 
>>> We are still eliminating other blockers from 2.29.0 anyhow.
>>>
>>> Kenn
>>>
>>> On Mon, Mar 15, 2021 at 7:17 AM Tomo Suzuki  wrote:

 Hi Beam developers,

 I'm working on upgrading the vendored gRPC 1.36.0
 https://issues.apache.org/jira/browse/BEAM-11227 (PR: 
 https://github.com/apache/beam/pull/14028)
 Let me know if you have any questions or concerns.

 Background:
 Exchanged messages with Ismaël in BEAM-11227, it seems that it the ticket 
 created by some automation is false positive, but it's nice to use an 
 artifact without being marked with CVE.

 Kenn offered to work as the release manager (as in 
 https://s.apache.org/beam-release-vendored-artifacts) of the vendored 
 artifact.

 --
 Regards,
 Tomo
>>
>>
>>
>> --
>> Regards,
>> Tomo
>
>
>
> --
> Regards,
> Tomo


Re: Upgrading vendored gRPC from 1.26.0 to 1.36.0

2021-03-25 Thread Kenneth Knowles
Yes, I agree this might be a good idea. This is not the only major issue on
the release-2.29.0 branch.

The counter argument is that we will be pulling in all the bugs introduced
to `master` since the branch cut.

As far as effort goes, I have been mostly focused on burning down the bugs
so I would not lose much work in the release process.

Kenn

On Thu, Mar 25, 2021 at 1:42 PM Ismaël Mejía  wrote:

> Precommit is quite unstable in the last days, so worth to check if
> something is wrong in the CI.
>
> I have a question Kenn. Given that cherry picking this might be a bit
> big as a change can we just reconsider cutting the 2.29.0 branch again
> after the updated gRPC version use gets merged and mark the issues
> already fixed for version 2.30.0 to version 2.29.0 ? Seems like an
> easier upgrade path (and we will get some nice fixes/improvements like
> official Spark 3 support for free on the release).
>
> WDYT?
>
>
> On Wed, Mar 24, 2021 at 8:06 PM Tomo Suzuki  wrote:
> >
> > Update: I observe that Java precommit check is unstable in the PR to
> upgrade vendored gRPC (compared with an PR with an empty change). There's
> no constant failures; sometimes it succeeds and other times it faces
> timeout and flaky test failures.
> >
> > https://github.com/apache/beam/pull/14295#issuecomment-806071087
> >
> >
> > On Mon, Mar 22, 2021 at 10:46 AM Tomo Suzuki  wrote:
> >>
> >> Thank you for the voting and I see the artifact available in Maven
> Central. I'll work on the PR to use the published artifact today.
> >>
> https://search.maven.org/artifact/org.apache.beam/beam-vendor-grpc-1_36_0/0.1/jar
> >>
> >> On Tue, Mar 16, 2021 at 3:07 PM Kenneth Knowles 
> wrote:
> >>>
> >>> Update on this: there are some minor issues and then I'll send out the
> RC.
> >>>
> >>> I think this is worth blocking 2.29.0 release on, so I will do this
> first. We are still eliminating other blockers from 2.29.0 anyhow.
> >>>
> >>> Kenn
> >>>
> >>> On Mon, Mar 15, 2021 at 7:17 AM Tomo Suzuki 
> wrote:
> 
>  Hi Beam developers,
> 
>  I'm working on upgrading the vendored gRPC 1.36.0
>  https://issues.apache.org/jira/browse/BEAM-11227 (PR:
> https://github.com/apache/beam/pull/14028)
>  Let me know if you have any questions or concerns.
> 
>  Background:
>  Exchanged messages with Ismaël in BEAM-11227, it seems that it the
> ticket created by some automation is false positive, but it's nice to use
> an artifact without being marked with CVE.
> 
>  Kenn offered to work as the release manager (as in
> https://s.apache.org/beam-release-vendored-artifacts) of the vendored
> artifact.
> 
>  --
>  Regards,
>  Tomo
> >>
> >>
> >>
> >> --
> >> Regards,
> >> Tomo
> >
> >
> >
> > --
> > Regards,
> > Tomo
>


PSA: "Java PostCommit" is now a few separate jobs

2021-03-25 Thread Kenneth Knowles
If you were going to comment "Run Java PostCommit" and wait 4 hours you now
need to choose from one/many of:

 - Run Java PostCommit (now expected <20m)
 - Run PostCommit_Java_Hadoop_Versions (now expected <20m)
 - Run PostCommit_Java_Dataflow (about 1 hour?)
 - Run PostCommit_Java_DataflowV2 (about 1 hour?)

Kenn


Python Dataframe API issue

2021-03-25 Thread Xinyu Liu
Hi, folks,

I am playing around with the Python Dataframe API, and seemly got an schema
issue when converting pcollection to dataframe. I wrote the following code
for a simple test:

import apache_beam as beam
from apache_beam.dataframe.convert import to_dataframe
from apache_beam.dataframe.convert import to_pcollection

p = beam.Pipeline()
data = p | beam.Create([('a', ''), ('b', '')]) | beam.Map(lambda x
: beam.Row(word=x[0], val=x[1]))
_ = data | beam.Map(print)
p.run()

This shows the following:
Row(val='', word='a') Row(val='', word='b')

But if I use to_dataframe() to convert it into a df, seems the schema was
reversed:

df = to_dataframe(data)
dataCopy = to_pcollection(df)
_ = dataCopy | beam.Map(print)
p.run()

I got:
BeamSchema_4100b64e_16e9_467d_932e_5fc2e4acaca7(word='', val='a')
BeamSchema_4100b64e_16e9_467d_932e_5fc2e4acaca7(word='', val='b')

Seems now the column 'word' and 'val' is swapped. The problem seems to
happen during to_dataframe(). If I print out df['word'], I got '' and
''. I am not sure whether I am doing something wrong or there is an
issue in the schema conversion. Could someone help me take a look?

Thanks, Xinyu


P1 issues report

2021-03-25 Thread Beam Jira Bot
This is your daily summary of Beam's current P1 issues, not including flaky 
tests.

See https://beam.apache.org/contribute/jira-priorities/#p1-critical for the 
meaning and expectations around P1 issues.

BEAM-12054: Mutator.close() has to be moved to @FinisheBundle in WriteFn 
and DeleteFn  (https://issues.apache.org/jira/browse/BEAM-12054)
BEAM-12050: ParDoTest TimerTests that use TestStream failing for portable 
FlinkRunner (https://issues.apache.org/jira/browse/BEAM-12050)
BEAM-12000: Support Python 3.9 in Apache Beam 
(https://issues.apache.org/jira/browse/BEAM-12000)
BEAM-11965: testSplitQueryFnWithLargeDataset timeout failures 
(https://issues.apache.org/jira/browse/BEAM-11965)
BEAM-11961: InfluxDBIOIT failing with unauthorized error 
(https://issues.apache.org/jira/browse/BEAM-11961)
BEAM-11959: Python Beam SDK Harness hangs when install pip packages 
(https://issues.apache.org/jira/browse/BEAM-11959)
BEAM-11922: 
org.apache.beam.examples.cookbook.MapClassIntegrationIT.testDataflowMapState 
has been failing in master (https://issues.apache.org/jira/browse/BEAM-11922)
BEAM-11886: MapState and SetState failing tests on Dataflow streaming 
(https://issues.apache.org/jira/browse/BEAM-11886)
BEAM-11862: Write To Kafka does not work 
(https://issues.apache.org/jira/browse/BEAM-11862)
BEAM-11828: JmsIO is not acknowledging messages correctly 
(https://issues.apache.org/jira/browse/BEAM-11828)
BEAM-11772: GCP BigQuery sink (file loads) uses runner determined sharding 
for unbounded data (https://issues.apache.org/jira/browse/BEAM-11772)
BEAM-11755: Cross-language consistency (RequiresStableInputs) is quietly 
broken (at least on portable flink runner) 
(https://issues.apache.org/jira/browse/BEAM-11755)
BEAM-11578: `dataflow_metrics` (python) fails with TypeError (when int 
overflowing?) (https://issues.apache.org/jira/browse/BEAM-11578)
BEAM-11434: Expose Spanner admin/batch clients in Spanner Accessor 
(https://issues.apache.org/jira/browse/BEAM-11434)
BEAM-11227: Upgrade beam-vendor-grpc-1_26_0-0.3 to fix CVE-2020-27216 
(https://issues.apache.org/jira/browse/BEAM-11227)
BEAM-11148: Kafka commitOffsetsInFinalize OOM on Flink 
(https://issues.apache.org/jira/browse/BEAM-11148)
BEAM-11017: Timer with dataflow runner can be set multiple times (dataflow 
runner) (https://issues.apache.org/jira/browse/BEAM-11017)
BEAM-10883: XmlIO parsing of multibyte characters 
(https://issues.apache.org/jira/browse/BEAM-10883)
BEAM-10861: Adds URNs and payloads to PubSub transforms 
(https://issues.apache.org/jira/browse/BEAM-10861)
BEAM-10663: CrossLanguageKafkaIOTest broken on Flink Runner 
(https://issues.apache.org/jira/browse/BEAM-10663)
BEAM-10573: CSV files are loaded several times if they are too large 
(https://issues.apache.org/jira/browse/BEAM-10573)
BEAM-10569: SpannerIO tests don't actually assert anything. 
(https://issues.apache.org/jira/browse/BEAM-10569)
BEAM-10288: Quickstart documents are out of date 
(https://issues.apache.org/jira/browse/BEAM-10288)
BEAM-10244: Populate requirements cache fails on poetry-based packages 
(https://issues.apache.org/jira/browse/BEAM-10244)
BEAM-10100: FileIO writeDynamic with AvroIO.sink not writing all data 
(https://issues.apache.org/jira/browse/BEAM-10100)
BEAM-9917: BigQueryBatchFileLoads dynamic destination 
(https://issues.apache.org/jira/browse/BEAM-9917)
BEAM-9564: Remove insecure ssl options from MongoDBIO 
(https://issues.apache.org/jira/browse/BEAM-9564)
BEAM-9455: Environment-sensitive provisioning for Dataflow 
(https://issues.apache.org/jira/browse/BEAM-9455)
BEAM-9154: Move Chicago Taxi Example to Python 3 
(https://issues.apache.org/jira/browse/BEAM-9154)
BEAM-8407: [SQL] Some Hive tests throw NullPointerException, but get marked 
as passing (Direct Runner) (https://issues.apache.org/jira/browse/BEAM-8407)
BEAM-7717: PubsubIO watermark tracking hovers near start of epoch 
(https://issues.apache.org/jira/browse/BEAM-7717)
BEAM-7716: PubsubIO returns empty message bodies for all messages read 
(https://issues.apache.org/jira/browse/BEAM-7716)
BEAM-7195: BigQuery - 404 errors for 'table not found' when using dynamic 
destinations - sometimes, new table fails to get created 
(https://issues.apache.org/jira/browse/BEAM-7195)
BEAM-7064: Conversion of timestamp from BigQuery row to Beam row loses 
precision (https://issues.apache.org/jira/browse/BEAM-7064)
BEAM-6839: User reports protobuf ClassChangeError running against 2.6.0 or 
above (https://issues.apache.org/jira/browse/BEAM-6839)
BEAM-6466: KafkaIO doesn't commit offsets while being used as bounded 
source (https://issues.apache.org/jira/browse/BEAM-6466)
BEAM-5997: EVENT_TIME timer throws exception when side input used 
(https://issues.apache.org/jira/browse/BEAM-5997)
BEAM-5305: Timeout handling in JdbcIO with Oracle java driver 
(https://issues.apache.or

Flaky test issue report

2021-03-25 Thread Beam Jira Bot
This is your daily summary of Beam's current flaky tests. These are P1 issues 
because they have a major negative impact on the community and make it hard to 
determine the quality of the software.

BEAM-12020: :sdks:java:container:java8:docker failing missing licenses 
(https://issues.apache.org/jira/browse/BEAM-12020)
BEAM-12019: 
apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics
 is flaky (https://issues.apache.org/jira/browse/BEAM-12019)
BEAM-11792: Python precommit failed (flaked?) installing package  
(https://issues.apache.org/jira/browse/BEAM-11792)
BEAM-11733: [beam_PostCommit_Java] [testFhirIO_Import|export] flaky 
(https://issues.apache.org/jira/browse/BEAM-11733)
BEAM-11666: 
apache_beam.runners.interactive.recording_manager_test.RecordingManagerTest.test_basic_execution
 is flaky (https://issues.apache.org/jira/browse/BEAM-11666)
BEAM-11661: hdfsIntegrationTest flake: network not found (py38 postcommit) 
(https://issues.apache.org/jira/browse/BEAM-11661)
BEAM-11646: beam_PostCommit_XVR_Spark failing 
(https://issues.apache.org/jira/browse/BEAM-11646)
BEAM-11645: beam_PostCommit_XVR_Flink failing 
(https://issues.apache.org/jira/browse/BEAM-11645)
BEAM-11540: Linter sometimes flakes on apache_beam.dataframe.frames_test 
(https://issues.apache.org/jira/browse/BEAM-11540)
BEAM-11493: Spark test failure: 
org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testGroupByKeyAndWindows
 (https://issues.apache.org/jira/browse/BEAM-11493)
BEAM-11492: Spark test failure: 
org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testGroupByKeyMergingWindows
 (https://issues.apache.org/jira/browse/BEAM-11492)
BEAM-11491: Spark test failure: 
org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testGroupByKeyMultipleWindows
 (https://issues.apache.org/jira/browse/BEAM-11491)
BEAM-11490: Spark test failure: 
org.apache.beam.sdk.transforms.ReifyTimestampsTest.inValuesSucceeds 
(https://issues.apache.org/jira/browse/BEAM-11490)
BEAM-11489: Spark test failure: 
org.apache.beam.sdk.metrics.MetricsTest$AttemptedMetricTests.testAttemptedDistributionMetrics
 (https://issues.apache.org/jira/browse/BEAM-11489)
BEAM-11488: Spark test failure: 
org.apache.beam.sdk.metrics.MetricsTest$AttemptedMetricTests.testAttemptedCounterMetrics
 (https://issues.apache.org/jira/browse/BEAM-11488)
BEAM-11487: Spark test failure: 
org.apache.beam.sdk.transforms.WithTimestampsTest.withTimestampsShouldApplyTimestamps
 (https://issues.apache.org/jira/browse/BEAM-11487)
BEAM-11486: Spark test failure: 
org.apache.beam.sdk.testing.PAssertTest.testSerializablePredicate 
(https://issues.apache.org/jira/browse/BEAM-11486)
BEAM-11485: Spark test failure: 
org.apache.beam.sdk.transforms.CombineFnsTest.testComposedCombineNullValues 
(https://issues.apache.org/jira/browse/BEAM-11485)
BEAM-11484: Spark test failure: 
org.apache.beam.runners.core.metrics.MetricsPusherTest.pushesUserMetrics 
(https://issues.apache.org/jira/browse/BEAM-11484)
BEAM-11483: Spark PostCommit Test Improvements 
(https://issues.apache.org/jira/browse/BEAM-11483)
BEAM-10987: stager_test.py::StagerTest::test_with_main_session flaky on 
windows py3.6,3.7 (https://issues.apache.org/jira/browse/BEAM-10987)
BEAM-10968: flaky test: 
org.apache.beam.sdk.metrics.MetricsTest$AttemptedMetricTests.testAttemptedDistributionMetrics
 (https://issues.apache.org/jira/browse/BEAM-10968)
BEAM-10955: Flink Java Runner test flake: Could not find Flink job  
(https://issues.apache.org/jira/browse/BEAM-10955)
BEAM-10923: Python requirements installation in docker container is flaky 
(https://issues.apache.org/jira/browse/BEAM-10923)
BEAM-10901: Flaky test: 
PipelineInstrumentTest.test_able_to_cache_intermediate_unbounded_source_pcollection
 (https://issues.apache.org/jira/browse/BEAM-10901)
BEAM-10899: test_FhirIO_exportFhirResourcesGcs flake with OOM 
(https://issues.apache.org/jira/browse/BEAM-10899)
BEAM-10866: PortableRunnerTestWithSubprocesses.test_register_finalizations 
flaky on macOS (https://issues.apache.org/jira/browse/BEAM-10866)
BEAM-10763: Spotless flake (NullPointerException) 
(https://issues.apache.org/jira/browse/BEAM-10763)
BEAM-10590: BigQueryQueryToTableIT flaky: test_big_query_new_types 
(https://issues.apache.org/jira/browse/BEAM-10590)
BEAM-10589: Samza ValidatesRunner failure: 
testParDoWithSideInputsIsCumulative 
(https://issues.apache.org/jira/browse/BEAM-10589)
BEAM-10519: 
MultipleInputsAndOutputTests.testParDoWithSideInputsIsCumulative flaky on Samza 
(https://issues.apache.org/jira/browse/BEAM-10519)
BEAM-10504: Failure / flake in ElasticSearchIOTest > 
testWriteFullAddressing and testWriteWithIndexFn 
(https://issues.apache.org/jira/browse/BEAM-10504)
BEAM-10501: CheckGrafanaStalenessAlerts and PingGrafanaHttpApi fail with 
Connection refused (https://issues.apache.org/jira/browse/

Re: Python Dataframe API issue

2021-03-25 Thread Robert Bradshaw
This is definitely wrong. Looking into what's going on here, but this seems
severe enough to be a blocker for the next release.

On Thu, Mar 25, 2021 at 3:39 PM Xinyu Liu  wrote:

> Hi, folks,
>
> I am playing around with the Python Dataframe API, and seemly got an
> schema issue when converting pcollection to dataframe. I wrote the
> following code for a simple test:
>
> import apache_beam as beam
> from apache_beam.dataframe.convert import to_dataframe
> from apache_beam.dataframe.convert import to_pcollection
>
> p = beam.Pipeline()
> data = p | beam.Create([('a', ''), ('b', '')]) | beam.Map(lambda
> x : beam.Row(word=x[0], val=x[1]))
> _ = data | beam.Map(print)
> p.run()
>
> This shows the following:
> Row(val='', word='a') Row(val='', word='b')
>
> But if I use to_dataframe() to convert it into a df, seems the schema was
> reversed:
>
> df = to_dataframe(data)
> dataCopy = to_pcollection(df)
> _ = dataCopy | beam.Map(print)
> p.run()
>
> I got:
> BeamSchema_4100b64e_16e9_467d_932e_5fc2e4acaca7(word='', val='a')
> BeamSchema_4100b64e_16e9_467d_932e_5fc2e4acaca7(word='', val='b')
>
> Seems now the column 'word' and 'val' is swapped. The problem seems to
> happen during to_dataframe(). If I print out df['word'], I got '' and
> ''. I am not sure whether I am doing something wrong or there is an
> issue in the schema conversion. Could someone help me take a look?
>
> Thanks, Xinyu
>


Re: Python Dataframe API issue

2021-03-25 Thread Robert Bradshaw
This could be https://issues.apache.org/jira/browse/BEAM-11929

On Thu, Mar 25, 2021 at 4:26 PM Robert Bradshaw  wrote:

> This is definitely wrong. Looking into what's going on here, but this
> seems severe enough to be a blocker for the next release.
>
> On Thu, Mar 25, 2021 at 3:39 PM Xinyu Liu  wrote:
>
>> Hi, folks,
>>
>> I am playing around with the Python Dataframe API, and seemly got an
>> schema issue when converting pcollection to dataframe. I wrote the
>> following code for a simple test:
>>
>> import apache_beam as beam
>> from apache_beam.dataframe.convert import to_dataframe
>> from apache_beam.dataframe.convert import to_pcollection
>>
>> p = beam.Pipeline()
>> data = p | beam.Create([('a', ''), ('b', '')]) | beam.Map(lambda
>> x : beam.Row(word=x[0], val=x[1]))
>> _ = data | beam.Map(print)
>> p.run()
>>
>> This shows the following:
>> Row(val='', word='a') Row(val='', word='b')
>>
>> But if I use to_dataframe() to convert it into a df, seems the schema was
>> reversed:
>>
>> df = to_dataframe(data)
>> dataCopy = to_pcollection(df)
>> _ = dataCopy | beam.Map(print)
>> p.run()
>>
>> I got:
>> BeamSchema_4100b64e_16e9_467d_932e_5fc2e4acaca7(word='', val='a')
>> BeamSchema_4100b64e_16e9_467d_932e_5fc2e4acaca7(word='', val='b')
>>
>> Seems now the column 'word' and 'val' is swapped. The problem seems to
>> happen during to_dataframe(). If I print out df['word'], I got '' and
>> ''. I am not sure whether I am doing something wrong or there is an
>> issue in the schema conversion. Could someone help me take a look?
>>
>> Thanks, Xinyu
>>
>


Re: Python Dataframe API issue

2021-03-25 Thread Brian Hulette
Yes this looks like https://issues.apache.org/jira/browse/BEAM-11929, I
removed it from the release blockers since there is a workaround (use a
NamedTuple type), but it's probably worth cherrypicking the fix.

On Thu, Mar 25, 2021 at 4:44 PM Robert Bradshaw  wrote:

> This could be https://issues.apache.org/jira/browse/BEAM-11929
>
> On Thu, Mar 25, 2021 at 4:26 PM Robert Bradshaw 
> wrote:
>
>> This is definitely wrong. Looking into what's going on here, but this
>> seems severe enough to be a blocker for the next release.
>>
>> On Thu, Mar 25, 2021 at 3:39 PM Xinyu Liu  wrote:
>>
>>> Hi, folks,
>>>
>>> I am playing around with the Python Dataframe API, and seemly got an
>>> schema issue when converting pcollection to dataframe. I wrote the
>>> following code for a simple test:
>>>
>>> import apache_beam as beam
>>> from apache_beam.dataframe.convert import to_dataframe
>>> from apache_beam.dataframe.convert import to_pcollection
>>>
>>> p = beam.Pipeline()
>>> data = p | beam.Create([('a', ''), ('b', '')]) | beam.Map(lambda
>>> x : beam.Row(word=x[0], val=x[1]))
>>> _ = data | beam.Map(print)
>>> p.run()
>>>
>>> This shows the following:
>>> Row(val='', word='a') Row(val='', word='b')
>>>
>>> But if I use to_dataframe() to convert it into a df, seems the schema
>>> was reversed:
>>>
>>> df = to_dataframe(data)
>>> dataCopy = to_pcollection(df)
>>> _ = dataCopy | beam.Map(print)
>>> p.run()
>>>
>>> I got:
>>> BeamSchema_4100b64e_16e9_467d_932e_5fc2e4acaca7(word='', val='a')
>>> BeamSchema_4100b64e_16e9_467d_932e_5fc2e4acaca7(word='', val='b')
>>>
>>> Seems now the column 'word' and 'val' is swapped. The problem seems to
>>> happen during to_dataframe(). If I print out df['word'], I got '' and
>>> ''. I am not sure whether I am doing something wrong or there is an
>>> issue in the schema conversion. Could someone help me take a look?
>>>
>>> Thanks, Xinyu
>>>
>>


Re: Python Dataframe API issue

2021-03-25 Thread Kenneth Knowles
Cloned to https://issues.apache.org/jira/browse/BEAM-12056

On Thu, Mar 25, 2021 at 4:46 PM Brian Hulette  wrote:

> Yes this looks like https://issues.apache.org/jira/browse/BEAM-11929, I
> removed it from the release blockers since there is a workaround (use a
> NamedTuple type), but it's probably worth cherrypicking the fix.
>
> On Thu, Mar 25, 2021 at 4:44 PM Robert Bradshaw 
> wrote:
>
>> This could be https://issues.apache.org/jira/browse/BEAM-11929
>>
>> On Thu, Mar 25, 2021 at 4:26 PM Robert Bradshaw 
>> wrote:
>>
>>> This is definitely wrong. Looking into what's going on here, but this
>>> seems severe enough to be a blocker for the next release.
>>>
>>> On Thu, Mar 25, 2021 at 3:39 PM Xinyu Liu  wrote:
>>>
 Hi, folks,

 I am playing around with the Python Dataframe API, and seemly got an
 schema issue when converting pcollection to dataframe. I wrote the
 following code for a simple test:

 import apache_beam as beam
 from apache_beam.dataframe.convert import to_dataframe
 from apache_beam.dataframe.convert import to_pcollection

 p = beam.Pipeline()
 data = p | beam.Create([('a', ''), ('b', '')]) | beam.Map(
 lambda x : beam.Row(word=x[0], val=x[1]))
 _ = data | beam.Map(print)
 p.run()

 This shows the following:
 Row(val='', word='a') Row(val='', word='b')

 But if I use to_dataframe() to convert it into a df, seems the schema
 was reversed:

 df = to_dataframe(data)
 dataCopy = to_pcollection(df)
 _ = dataCopy | beam.Map(print)
 p.run()

 I got:
 BeamSchema_4100b64e_16e9_467d_932e_5fc2e4acaca7(word='', val='a')
 BeamSchema_4100b64e_16e9_467d_932e_5fc2e4acaca7(word='', val='b')

 Seems now the column 'word' and 'val' is swapped. The problem seems to
 happen during to_dataframe(). If I print out df['word'], I got '' and
 ''. I am not sure whether I am doing something wrong or there is an
 issue in the schema conversion. Could someone help me take a look?

 Thanks, Xinyu

>>>


Re: Python Dataframe API issue

2021-03-25 Thread Robert Bradshaw
Thanks, Xinyu, for finding this!

On Thu, Mar 25, 2021 at 4:48 PM Kenneth Knowles  wrote:

> Cloned to https://issues.apache.org/jira/browse/BEAM-12056
>
> On Thu, Mar 25, 2021 at 4:46 PM Brian Hulette  wrote:
>
>> Yes this looks like https://issues.apache.org/jira/browse/BEAM-11929, I
>> removed it from the release blockers since there is a workaround (use a
>> NamedTuple type), but it's probably worth cherrypicking the fix.
>>
>> On Thu, Mar 25, 2021 at 4:44 PM Robert Bradshaw 
>> wrote:
>>
>>> This could be https://issues.apache.org/jira/browse/BEAM-11929
>>>
>>> On Thu, Mar 25, 2021 at 4:26 PM Robert Bradshaw 
>>> wrote:
>>>
 This is definitely wrong. Looking into what's going on here, but this
 seems severe enough to be a blocker for the next release.

 On Thu, Mar 25, 2021 at 3:39 PM Xinyu Liu 
 wrote:

> Hi, folks,
>
> I am playing around with the Python Dataframe API, and seemly got an
> schema issue when converting pcollection to dataframe. I wrote the
> following code for a simple test:
>
> import apache_beam as beam
> from apache_beam.dataframe.convert import to_dataframe
> from apache_beam.dataframe.convert import to_pcollection
>
> p = beam.Pipeline()
> data = p | beam.Create([('a', ''), ('b', '')]) | beam.Map(
> lambda x : beam.Row(word=x[0], val=x[1]))
> _ = data | beam.Map(print)
> p.run()
>
> This shows the following:
> Row(val='', word='a') Row(val='', word='b')
>
> But if I use to_dataframe() to convert it into a df, seems the schema
> was reversed:
>
> df = to_dataframe(data)
> dataCopy = to_pcollection(df)
> _ = dataCopy | beam.Map(print)
> p.run()
>
> I got:
> BeamSchema_4100b64e_16e9_467d_932e_5fc2e4acaca7(word='', val='a')
> BeamSchema_4100b64e_16e9_467d_932e_5fc2e4acaca7(word='', val='b')
>
> Seems now the column 'word' and 'val' is swapped. The problem seems to
> happen during to_dataframe(). If I print out df['word'], I got '' and
> ''. I am not sure whether I am doing something wrong or there is an
> issue in the schema conversion. Could someone help me take a look?
>
> Thanks, Xinyu
>



Re: Write to multiple IOs in linear fashion

2021-03-25 Thread Kenneth Knowles
On Thu, Mar 25, 2021 at 12:55 PM Robert Bradshaw 
wrote:

> On Wed, Mar 24, 2021 at 7:29 PM Vincent Marquez 
> wrote:
>
>>
>> *~Vincent*
>>
>>
>> On Wed, Mar 24, 2021 at 6:07 PM Kenneth Knowles  wrote:
>>
>>> The reason I was checking out the code is that sometimes a natural thing
>>> to output would be a summary of what was written. So each chunk of writes
>>> and the final chunk written in @FinishBundle. This is, for example, what
>>> SQL engines do (output # of rows written).
>>>
>>> You could output both the summary and the full list of written elements
>>> to different outputs, and users can choose. Outputs that are never consumed
>>> should be very low or zero cost.n
>>>
>>>
>> I like this approach.  I would much prefer two outputs (one of which is
>> all elements written) to returning an existential/wildcard PCollection.
>>
>
> +1, this would work well too. Returning a PCollectionTuple is extensible
> too, as one could add more (or better) outputs in the future without
> changing the signature.
>

This comment is dangerously close to sparking a philosophical conversation!

Kenn


>
>>
>>
>>> Kenn
>>>
>>> On Wed, Mar 24, 2021 at 5:36 PM Robert Bradshaw 
>>> wrote:
>>>
 Yeah, the entire input is not always what is needed, and can generally
 be achieved via

 input -> wait(side input of write) -> do something with the input

 Of course one could also do

 entire_input_as_output_of_wait -> MapTo(KV.of(null, null)) ->
 CombineGlobally(TrivialCombineFn)

 to reduce this to a more minimal set with at least one element per
 Window.

 The file writing operations emit the actual files that were written,
 which can be handy. My suggestion of PCollection was just so that we can
 emit something usable, and decide exactly what is the most useful is later.


 On Wed, Mar 24, 2021 at 5:30 PM Reuven Lax  wrote:

> I believe that the Wait transform turns this output into a side input,
> so outputting the input PCollection might be problematic.
>
> On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles 
> wrote:
>
>> Alex's idea sounds good and like what Vincent maybe implemented. I am
>> just reading really quickly so sorry if I missed something...
>>
>> Checking out the code for the WriteFn I see a big problem:
>>
>> @Setup
>> public void setup() {
>>   writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
>> }
>>
>> @ProcessElement
>>   public void processElement(ProcessContext c) throws
>> ExecutionException, InterruptedException {
>>   writer.mutate(c.element());
>> }
>>
>> @Teardown
>> public void teardown() throws Exception {
>>   writer.close();
>>   writer = null;
>> }
>>
>> It is only in writer.close() that all async writes are waited on.
>> This needs to happen in @FinishBundle.
>>
>> Did you discover this when implementing your own Cassandra.Write?
>>
>> Until you have waited on the future, you should not output the
>> element as "has been written". And you cannot output from the @TearDown
>> method which is just for cleaning up resources.
>>
>> Am I reading this wrong?
>>
>> Kenn
>>
>> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato 
>> wrote:
>>
>>> How about a PCollection containing every element which was
>>> successfully written?
>>> Basically the same things which were passed into it.
>>>
>>> Then you could act on every element after its been successfully
>>> written to the sink.
>>>
>>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw 
>>> wrote:
>>>
 On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía 
 wrote:

> +dev
>
> Since we all agree that we should return something different than
> PDone the real question is what should we return.
>

 My proposal is that one returns a PCollection that consists,
 internally, of something contentless like nulls. This is future 
 compatible
 with returning something more maningful based on the source source or 
 write
 process itself, but at least this would be followable.


> As a reminder we had a pretty interesting discussion about this
> already in the past but uniformization of our return values has not
> happened.
> This thread is worth reading for Vincent or anyone who wants to
> contribute Write transforms that return.
>
> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E


 Yeah, we should go ahead and finally do something.


>
> > Returning PDone is an anti-pattern that should be avoided, but
> chan

Re: Python Dataframe API issue

2021-03-25 Thread Xinyu Liu
Np, thanks for quickly identifying the fix.

Btw, I am very happy about Beam Python supporting the same Pandas dataframe
api. It's super user-friendly to both devs and data scientists. Really cool
work!

Thanks,
Xinyu

On Thu, Mar 25, 2021 at 4:53 PM Robert Bradshaw  wrote:

> Thanks, Xinyu, for finding this!
>
> On Thu, Mar 25, 2021 at 4:48 PM Kenneth Knowles  wrote:
>
>> Cloned to https://issues.apache.org/jira/browse/BEAM-12056
>>
>> On Thu, Mar 25, 2021 at 4:46 PM Brian Hulette 
>> wrote:
>>
>>> Yes this looks like https://issues.apache.org/jira/browse/BEAM-11929, I
>>> removed it from the release blockers since there is a workaround (use a
>>> NamedTuple type), but it's probably worth cherrypicking the fix.
>>>
>>> On Thu, Mar 25, 2021 at 4:44 PM Robert Bradshaw 
>>> wrote:
>>>
 This could be https://issues.apache.org/jira/browse/BEAM-11929

 On Thu, Mar 25, 2021 at 4:26 PM Robert Bradshaw 
 wrote:

> This is definitely wrong. Looking into what's going on here, but this
> seems severe enough to be a blocker for the next release.
>
> On Thu, Mar 25, 2021 at 3:39 PM Xinyu Liu 
> wrote:
>
>> Hi, folks,
>>
>> I am playing around with the Python Dataframe API, and seemly got an
>> schema issue when converting pcollection to dataframe. I wrote the
>> following code for a simple test:
>>
>> import apache_beam as beam
>> from apache_beam.dataframe.convert import to_dataframe
>> from apache_beam.dataframe.convert import to_pcollection
>>
>> p = beam.Pipeline()
>> data = p | beam.Create([('a', ''), ('b', '')]) | beam.Map(
>> lambda x : beam.Row(word=x[0], val=x[1]))
>> _ = data | beam.Map(print)
>> p.run()
>>
>> This shows the following:
>> Row(val='', word='a') Row(val='', word='b')
>>
>> But if I use to_dataframe() to convert it into a df, seems the schema
>> was reversed:
>>
>> df = to_dataframe(data)
>> dataCopy = to_pcollection(df)
>> _ = dataCopy | beam.Map(print)
>> p.run()
>>
>> I got:
>> BeamSchema_4100b64e_16e9_467d_932e_5fc2e4acaca7(word='', val='a')
>> BeamSchema_4100b64e_16e9_467d_932e_5fc2e4acaca7(word='', val='b')
>>
>> Seems now the column 'word' and 'val' is swapped. The problem seems
>> to happen during to_dataframe(). If I print out df['word'], I got ''
>> and ''. I am not sure whether I am doing something wrong or there is 
>> an
>> issue in the schema conversion. Could someone help me take a look?
>>
>> Thanks, Xinyu
>>
>