Re: BigTable reader for Python?

2022-07-28 Thread Chamikara Jayalath via dev
On Thu, Jul 28, 2022 at 4:51 PM Lina Mårtensson  wrote:

> Thanks for the detailed answers!
>
> I totally get the points about development & maintenance cost, and,
> from a user perspective, about getting the performance right.
>
> I decided to try out the Spanner connector to get a sense of how well
> the x-language approach works in our world, since that's an existing
> x-language connector.
> Overall, it works and with minimal intervention as you say - it is
> very slow, though.
> I'm a little confused about "portable runners" - if I understand this
> correctly, this means we couldn't run with the DirectRunner anymore if
> using an x-language connector? (At least it didn't work when I tried
> it.)
>

You'll have to use the portable DirectRunner -
https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/portability

Job service for this can be started using following command:
python apache_beam/runners/portability/local_job_service_main.py -p 

Instructions for using this should be similar to here (under "Portable
(Java/Python/Go)"): https://beam.apache.org/documentation/runners/flink/


>
> My test of running a trivial GCS-to-Spanner job with 18 KB of input on
> Dataflow takes about 15 minutes end-to-end. 5+ minutes of that is
> uploading the expansion service to GCS, and the startup time on
> Dataflow takes several minutes as well:
> "INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS
> upload to
> gs://dataflow-staging-us-central1-92d40d9a13427cbb4dfa41465ce57494/beamapp-lina-0728173601-761137-4rfo0mb9.1659029761.762052/beam-sdks-java-io-google-cloud-platform-expansion-service-2.39.0-uBMB6BRMpxmYFg1PPu1yUxeoyeyX_lYX1NX0LVL7ZcM.jar
> in 337 seconds."
> Is that expected, or are we doing something strange here? My internet
> isn't very fast here, so these up/downloads can really slow things
> down.
> I tried adding --prebuild_sdk_container_engine=cloud_build but that
> doesn't affect the .jar file.
>

There are several things contributing to the end-to-end execution time.

* Time to stage dependencies including the shaded jar file (that is used
both by the expansion service and at runtime).

This is cross-language only. But you control the jar file. You are trying
to use the
existing beam-sdks-java-io-google-cloud-platform-expansion-service jar
which is a 114 MB file.
https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-google-cloud-platform-expansion-service/2.39.0

Not exactly sure why it took 337 seconds. But could possibly be a network
issue. You could also define a new smaller expansion service jar just for
Spanner if needed.

* Time to start the job
This is mostly common for both cross-language and non-cross-language jobs.
Starting up the Dataflow worker pool could take some time. Cross-language
could take slightly longer since we need to start both Java and Python
containers but this is a fixed cost (not dependent on the job/input size).

* Time to execute the job.
This is what I'd compare if you want to decide on a pure-Python vs a Java
cross-language implementation just based on performance. Cross-language
version would have an added cost to serialize data and send across SDK
harness containers (within the same VM for Dataflow).
On the other hand cross-language version would be reading using a
Java implementation which I expected to be more performant than a pure
Python read implementation.

Hope this helps.

Thanks,
Cham




>
> If we can get this to a workable time, and/or iterate locally, then I
> think an x-language connector for Bigtable could work out well.
> Otherwise we might have to look at a native Python version after all.
>
> Thanks!
> -Lina
>
> On Wed, Jul 27, 2022 at 1:39 PM Chamikara Jayalath 
> wrote:
> >
> >
> >
> > On Wed, Jul 27, 2022 at 11:10 AM Lina Mårtensson 
> wrote:
> >>
> >> Thanks Cham!
> >>
> >> Could you provide some more detail on your preference for developing a
> >> Python wrapper rather than implementing a source purely in Python?
> >
> >
> > I've mentioned the main advantages of developing a cross-language
> transform over natively implementing this in Python below.
> >
> > * Reduced cost of development
> >
> > It's much easier to  develop a cross-language wrapper of the Java
> source than re-implementing the source in Python. Sources are some of the
> most complex
> > code we have in Beam and sources control the parallelization of the
> pipeline (for example, splitting and dynamic work rebalancing for supported
> runners). So getting this code wrong can result in hard to track data
> loss/duplication related issues.
> > Additionally, based on my experience, it's very hard to get a source
> implementation correct and performant on the first try. It could take
> additional benchmarks/user feedback over time to get the source production
> ready.
> > Java BT source is already battle tested well (actually we have two Java
> implementations [1][2] currently). So I would rather use a Java BT
> connector as a 

Re: BigTable reader for Python?

2022-07-28 Thread Lina Mårtensson via dev
Thanks for the detailed answers!

I totally get the points about development & maintenance cost, and,
from a user perspective, about getting the performance right.

I decided to try out the Spanner connector to get a sense of how well
the x-language approach works in our world, since that's an existing
x-language connector.
Overall, it works and with minimal intervention as you say - it is
very slow, though.
I'm a little confused about "portable runners" - if I understand this
correctly, this means we couldn't run with the DirectRunner anymore if
using an x-language connector? (At least it didn't work when I tried
it.)

My test of running a trivial GCS-to-Spanner job with 18 KB of input on
Dataflow takes about 15 minutes end-to-end. 5+ minutes of that is
uploading the expansion service to GCS, and the startup time on
Dataflow takes several minutes as well:
"INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS
upload to 
gs://dataflow-staging-us-central1-92d40d9a13427cbb4dfa41465ce57494/beamapp-lina-0728173601-761137-4rfo0mb9.1659029761.762052/beam-sdks-java-io-google-cloud-platform-expansion-service-2.39.0-uBMB6BRMpxmYFg1PPu1yUxeoyeyX_lYX1NX0LVL7ZcM.jar
in 337 seconds."
Is that expected, or are we doing something strange here? My internet
isn't very fast here, so these up/downloads can really slow things
down.
I tried adding --prebuild_sdk_container_engine=cloud_build but that
doesn't affect the .jar file.

If we can get this to a workable time, and/or iterate locally, then I
think an x-language connector for Bigtable could work out well.
Otherwise we might have to look at a native Python version after all.

Thanks!
-Lina

On Wed, Jul 27, 2022 at 1:39 PM Chamikara Jayalath  wrote:
>
>
>
> On Wed, Jul 27, 2022 at 11:10 AM Lina Mårtensson  wrote:
>>
>> Thanks Cham!
>>
>> Could you provide some more detail on your preference for developing a
>> Python wrapper rather than implementing a source purely in Python?
>
>
> I've mentioned the main advantages of developing a cross-language transform 
> over natively implementing this in Python below.
>
> * Reduced cost of development
>
> It's much easier to  develop a cross-language wrapper of the Java  source 
> than re-implementing the source in Python. Sources are some of the most 
> complex
> code we have in Beam and sources control the parallelization of the pipeline 
> (for example, splitting and dynamic work rebalancing for supported runners). 
> So getting this code wrong can result in hard to track data loss/duplication 
> related issues.
> Additionally, based on my experience, it's very hard to get a source 
> implementation correct and performant on the first try. It could take 
> additional benchmarks/user feedback over time to get the source production 
> ready.
> Java BT source is already battle tested well (actually we have two Java 
> implementations [1][2] currently). So I would rather use a Java BT connector 
> as a cross-language transform than re-implementing sources for other SDKs.
>
> * Minimal maintenance cost
>
> Developing a source/sink is just a part of the story. We (as a community) 
> have to maintain it over time and make sure that ongoing issues/feature 
> requests are adequately handled. In the past, we have had cases where 
> sources/sinks are available for multiple SDKs but one
> is significantly better than others when it comes to the feature set (for 
> example, BigQuery). Cross-language will make this easier and will allow us to 
> maintain key logic in a single place.
>
>>
>>
>> If I look at the instructions for using the x-language Spanner
>> connector, then using this - from the user's perspective - would
>> involve installing a Java runtime.
>> That's not terrible, but I fear that getting this to work with bazel
>> might end up being more trouble than expected. (That has often
>> happened here, and we have enough trouble with getting Python 3.9 and
>> 3.10 to co-exist.)
>
>
> From an end user perspective, all they should have to do is make sure that 
> Java is available in the machine where the job is submitted from. Beam has 
> features to allow starting up cross-language expansion services (that is 
> needed during job submission) automatically so users should not have to do 
> anything other than that.
>
> At job execution, Beam (portable) uses Docker-based SDK harness containers 
> and we already release appropriate containers for each SDK. The runners 
> should seamlessly download containers needed to execute the job.
>
> That said, the main downside of cross-language today is runner support. 
> Cross-language transform support is only available for portable Beam runners 
> (for example, Dataflow Runner v2) but this is the direction Beam runners are 
> going anyway.
>
>>
>>
>> There are a few of us at our small start-up that have written
>> MapReduces and similar in the past and are completely convinced by the
>> Beam/Dataflow model. But many others have no previous experience and
>> are skeptical, and see 

Re: [Release] 2.41.0 release update

2022-07-28 Thread Kiley Sok via dev
Quick update for today:

I'm still working through the validation tests, but we currently have 2
open issues:
https://github.com/apache/beam/issues/22454
https://github.com/apache/beam/issues/22188



On Wed, Jul 27, 2022 at 5:03 PM Kiley Sok  wrote:

> Hi all,
>
> I've cut the release branch:
> https://github.com/apache/beam/tree/release-2.41.0
>
> There's one known issue
> 
>  that
> needs to be cherry picked. Please let me know if you have a change that
> needs to go in.
>
> Thanks,
> Kiley
>
>


Re: [RFC] State & Timers API Design for Go SDK

2022-07-28 Thread Austin Bennett
Looks great!

On Thu, Jul 28, 2022 at 10:54 AM Jack McCluskey via dev 
wrote:

> Great write-up on state and timers! The solution you chose feels very
> in-line with how the Go SDK works. Make sure the design doc makes it onto
> the wiki once you've addressed any feedback!
>
> On Thu, Jul 28, 2022 at 1:49 PM Kerry Donny-Clark via dev <
> dev@beam.apache.org> wrote:
>
>> I think this a perfect example of a clear design doc. Great, deeply
>> detailed alternatives considered and why they were rejected. This makes
>> review easy, and lets us follow your thought process.
>> I think this is a good implementation, and I support the chosen approach.
>> Kerry
>>
>> On Thu, Jul 28, 2022 at 1:41 PM Kenneth Knowles  wrote:
>>
>>> Really thorough. Love it!
>>>
>>> On Thu, Jul 28, 2022 at 9:02 AM Ritesh Ghorse via dev <
>>> dev@beam.apache.org> wrote:
>>>
 Hey everyone,

 Danny  and I have been working on
 designing the state and timers for Go SDK. We wrote a design doc with
 user-facing API, execution details, and different alternatives considered.
 It would be really helpful if we could get your
 suggestions/feedback/comments on the design.

 Design Doc:
 https://docs.google.com/document/d/1rcKa1Z6orDDFr1l8t6NA1eLl6zanQbYAEiAqk39NQUU/edit?usp=sharing

 Thanks!
 Ritesh Ghorse

>>>


Re: [RFC] State & Timers API Design for Go SDK

2022-07-28 Thread Jack McCluskey via dev
Great write-up on state and timers! The solution you chose feels very
in-line with how the Go SDK works. Make sure the design doc makes it onto
the wiki once you've addressed any feedback!

On Thu, Jul 28, 2022 at 1:49 PM Kerry Donny-Clark via dev <
dev@beam.apache.org> wrote:

> I think this a perfect example of a clear design doc. Great, deeply
> detailed alternatives considered and why they were rejected. This makes
> review easy, and lets us follow your thought process.
> I think this is a good implementation, and I support the chosen approach.
> Kerry
>
> On Thu, Jul 28, 2022 at 1:41 PM Kenneth Knowles  wrote:
>
>> Really thorough. Love it!
>>
>> On Thu, Jul 28, 2022 at 9:02 AM Ritesh Ghorse via dev <
>> dev@beam.apache.org> wrote:
>>
>>> Hey everyone,
>>>
>>> Danny  and I have been working on
>>> designing the state and timers for Go SDK. We wrote a design doc with
>>> user-facing API, execution details, and different alternatives considered.
>>> It would be really helpful if we could get your
>>> suggestions/feedback/comments on the design.
>>>
>>> Design Doc:
>>> https://docs.google.com/document/d/1rcKa1Z6orDDFr1l8t6NA1eLl6zanQbYAEiAqk39NQUU/edit?usp=sharing
>>>
>>> Thanks!
>>> Ritesh Ghorse
>>>
>>


Re: Runner benchmarks in portable mode

2022-07-28 Thread Kenneth Knowles
Your question isn't for me, but I just want to say that I am really happy
to hear you are doing this. I would like to get more continuous
benchmarking so we can reduce any overheads Beam might introduce, for
example on Samza in your case. And I would like to basically focus entirely
on portable mode. (I do think that a runner like Samza could execute Java
DoFns in classic style and Python in portable all in the same deployment,
if you were ambitious)

Kenn

On Thu, Jul 28, 2022 at 10:31 AM Bharath Kumara Subramanian <
codin.mart...@gmail.com> wrote:

> Hi,
>
> We are currently working on making beam portable mode mainstream in
> addition to supporting classic mode for Samza runner.
>
> I was looking at OSS benchmarks on how other runners performed in portable
> mode in comparison with the classic mode. However, all I found was performance
> numbers and metrics for various classic runners
> 
> .
>
> Checking in to see if anyone in the community has benchmarked portable
> mode numbers for their runners.
>
> Additionally, I found vanilla metrics around GRPC performance
> , although I am
> looking for pointers to get granular insights on E2E pipeline latency.
> e.g., the time spent on network across stages vs serialization cost for
> GRPC vs actual time spent executing the ParDO and so-on.
>
>
> Thanks,
> Bharath
>
>


Re: [RFC] State & Timers API Design for Go SDK

2022-07-28 Thread Kerry Donny-Clark via dev
I think this a perfect example of a clear design doc. Great, deeply
detailed alternatives considered and why they were rejected. This makes
review easy, and lets us follow your thought process.
I think this is a good implementation, and I support the chosen approach.
Kerry

On Thu, Jul 28, 2022 at 1:41 PM Kenneth Knowles  wrote:

> Really thorough. Love it!
>
> On Thu, Jul 28, 2022 at 9:02 AM Ritesh Ghorse via dev 
> wrote:
>
>> Hey everyone,
>>
>> Danny  and I have been working on
>> designing the state and timers for Go SDK. We wrote a design doc with
>> user-facing API, execution details, and different alternatives considered.
>> It would be really helpful if we could get your
>> suggestions/feedback/comments on the design.
>>
>> Design Doc:
>> https://docs.google.com/document/d/1rcKa1Z6orDDFr1l8t6NA1eLl6zanQbYAEiAqk39NQUU/edit?usp=sharing
>>
>> Thanks!
>> Ritesh Ghorse
>>
>


Re: [RFC] State & Timers API Design for Go SDK

2022-07-28 Thread Kenneth Knowles
Really thorough. Love it!

On Thu, Jul 28, 2022 at 9:02 AM Ritesh Ghorse via dev 
wrote:

> Hey everyone,
>
> Danny  and I have been working on
> designing the state and timers for Go SDK. We wrote a design doc with
> user-facing API, execution details, and different alternatives considered.
> It would be really helpful if we could get your
> suggestions/feedback/comments on the design.
>
> Design Doc:
> https://docs.google.com/document/d/1rcKa1Z6orDDFr1l8t6NA1eLl6zanQbYAEiAqk39NQUU/edit?usp=sharing
>
> Thanks!
> Ritesh Ghorse
>


Beam Dependency Check Report (2022-07-28)

2022-07-28 Thread Apache Jenkins Server
<<< text/html; charset=UTF-8: Unrecognized >>>


Re: Output after Pipeline replaceAll

2022-07-28 Thread Kenneth Knowles
Yes, this is expected. The goal of replaceAll is to replace a transform
with a different subgraph that implements precisely the same semantics. And
since the rest of the graph depends on the PCollections, the new expanded
transform is wired directly to the old outputs.

First point: Certainly the streaming mode should also support bounded
inputs, so the test as-is should still pass.

But still, it does seem potentially useful to increase test coverage by
re-using the same test with an unbounded-but-finite input like you are
doing. I am not sure this will work for all tests. Some transforms may
require bounded input.

But to force it: You may be able to just run the visitor and set the
boundedness property on the PCollections. Then of course your SparkRunner
translation layer needs to know what to do. It may be simpler to just have
a flag on your translator that translates Create.Values into something that
looks unbounded at the Spark layer.

Kenn

On Thu, Jul 28, 2022 at 2:01 AM Moritz Mack  wrote:

> Hi all,
>
>
>
> Wondering if somebody could help and shed some lights on the behavior of
> Pipeline.replaceAll, particularly the outputs to expect after the
> replacement.
>
> I’m currently looking into supporting VR tests for SparkRunner in
> streaming mode [1]. Unfortunately, I didn’t succeed replacing (wrapping)
> the unbounded Create.Values used as test input into an unbounded source in
> a way that the node’s output would be UNBOUNDED. After the replacement the
> output is actually still the original one.
>
> Is this expected? What would be the recommended way to achieve this?
>
>
>
> Below some code to explain further [2].
>
> Also, related, [3] is a tiny PR to fix a broken assertion in
> PipelineTest.testReplaceAll().
>
>
>
> Thanks,
>
> Moritz
>
>
>
> pipeline.apply("boundedToUnbounded", Create.of(0L));
>
>
>
> pipeline.replaceAll(
>
> ImmutableList.of(
>
> PTransformOverride.of(
>
> application -> application.getTransform() instanceof
> Create.Values,
>
> // Replacement is
> GenerateSequence.from(Iterables.getOnlyElement(elements)))
>
> new ValuesToUnboundedSequenceOverride(;
>
>
>
> pipeline.traverseTopologically(
>
> new PipelineVisitor.Defaults() {
>
>   @Override
>
>   public CompositeBehavior enterCompositeTransform(Node node) {
>
> if (node.getFullName().equals("boundedToUnbounded")) {
>
>   assertThat(node.getTransform(),
> Matchers.instanceOf(GenerateSequence.class));
>
>
>
>   // FIXME Node still contains the original BOUNDED output.
> But why?
>
>   PCollection output =
> Iterables.getOnlyElement(node.getOutputs().values());
>
>   assertThat(output.isBounded(),
> Matchers.equalTo(PCollection.IsBounded.UNBOUNDED));
>
> }
>
> return CompositeBehavior.ENTER_TRANSFORM;
>
>   }
>
> });
>
>
>
>
>
> [1] https://github.com/apache/beam/pull/22473
>
> [2]
> https://github.com/apache/beam/compare/master...mosche:beam:BoundedToUnboundedReplaceAll
>
>
> [3] https://github.com/apache/beam/pull/22485
>
>
>
> *As a recipient of an email from Talend, your contact personal data will
> be on our systems. Please see our privacy notice.
> *
>
>
>


Runner benchmarks in portable mode

2022-07-28 Thread Bharath Kumara Subramanian
Hi,

We are currently working on making beam portable mode mainstream in
addition to supporting classic mode for Samza runner.

I was looking at OSS benchmarks on how other runners performed in portable
mode in comparison with the classic mode. However, all I found was performance
numbers and metrics for various classic runners

.

Checking in to see if anyone in the community has benchmarked portable mode
numbers for their runners.

Additionally, I found vanilla metrics around GRPC performance
, although I am
looking for pointers to get granular insights on E2E pipeline latency.
e.g., the time spent on network across stages vs serialization cost for
GRPC vs actual time spent executing the ParDO and so-on.


Thanks,
Bharath


Re: [ANNOUNCE] New committer: Steven Niemitz

2022-07-28 Thread Shanfang Zhao
Congrats Steve!

On Thu, Jul 28, 2022 at 9:46 AM Ahmet Altay via dev 
wrote:

> Congratulations Steve!
>
> On Thu, Jul 21, 2022 at 10:31 AM Steve Niemitz via dev <
> dev@beam.apache.org> wrote:
>
>> Thanks everyone!
>>
>> On Thu, Jul 21, 2022 at 2:23 AM Moritz Mack  wrote:
>>
>>> Congrats, Steven!
>>>
>>>
>>>
>>> On 21.07.22, 05:25, "Evan Galpin"  wrote:
>>>
>>>
>>>
>>> Congrats! Well deserved! On Wed, Jul 20, 2022 at 15:⁠​17 Chamikara
>>> Jayalath via dev  wrote:⁠​ Congrats, Steve!
>>> On Wed, Jul 20, 2022, 9:⁠​16 AM Austin Bennett >> ​gmail.⁠​com> wrote:⁠​ Great!   ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍
>>>
>>> ZjQcmQRYFpfptBannerStart
>>>
>>> ZjQcmQRYFpfptBannerEnd
>>>
>>> Congrats! Well deserved!
>>>
>>>
>>>
>>> On Wed, Jul 20, 2022 at 15:17 Chamikara Jayalath via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>> Congrats, Steve!
>>>
>>>
>>>
>>> On Wed, Jul 20, 2022, 9:16 AM Austin Bennett <
>>> whatwouldausti...@gmail.com> wrote:
>>>
>>> Great!
>>>
>>>
>>>
>>> On Wed, Jul 20, 2022 at 10:11 AM Aizhamal Nurmamat kyzy <
>>> aizha...@apache.org> wrote:
>>>
>>> Congrats, Steve!
>>>
>>>
>>>
>>> On Wed, Jul 20, 2022 at 3:10 AM Jan Lukavský  wrote:
>>>
>>> Congrats Steve!
>>>
>>> On 7/20/22 06:20, Reuven Lax via dev wrote:
>>>
>>> Welcome Steve!
>>>
>>>
>>>
>>> On Tue, Jul 19, 2022 at 1:05 PM Connell O'Callaghan via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>
>>> +++1 Woohoo! Congratulations Steven (and to the BEAM community) on this
>>> announcement!!!
>>>
>>>
>>>
>>> Thank you Luke for this update
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Jul 19, 2022 at 12:34 PM Robert Burke 
>>> wrote:
>>>
>>> Woohoo! Welcome and congratulations Steven!
>>>
>>>
>>>
>>> On Tue, Jul 19, 2022, 12:40 PM Luke Cwik via dev 
>>> wrote:
>>>
>>> Hi all,
>>>
>>> Please join me and the rest of the Beam PMC in welcoming
>>> a new committer: Steven Niemitz (sniemitz@)
>>>
>>> Steven started contributing to Beam in 2017 fixing bugs and improving
>>> logging and usability. Stevens most recent focus has been on performance
>>> optimizations within the Java SDK.
>>>
>>>
>>>
>>> Considering the time span and number of contributions, the Beam PMC
>>> trusts Steven with the responsibilities of a Beam committer. [1]
>>>
>>>
>>> Thank you Steven! And we are looking to see more of your contributions!
>>>
>>>
>>>
>>> Luke, on behalf of the Apache Beam PMC
>>>
>>> [1]
>>> https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer
>>> 
>>>
>>> *As a recipient of an email from Talend, your contact personal data will
>>> be on our systems. Please see our privacy notice.
>>> *
>>>
>>>
>>> --
Best,
Shanfang


Re: [ANNOUNCE] New committer: Steven Niemitz

2022-07-28 Thread Ahmet Altay via dev
Congratulations Steve!

On Thu, Jul 21, 2022 at 10:31 AM Steve Niemitz via dev 
wrote:

> Thanks everyone!
>
> On Thu, Jul 21, 2022 at 2:23 AM Moritz Mack  wrote:
>
>> Congrats, Steven!
>>
>>
>>
>> On 21.07.22, 05:25, "Evan Galpin"  wrote:
>>
>>
>>
>> Congrats! Well deserved! On Wed, Jul 20, 2022 at 15:⁠​17 Chamikara
>> Jayalath via dev  wrote:⁠​ Congrats, Steve!
>> On Wed, Jul 20, 2022, 9:⁠​16 AM Austin Bennett > ​gmail.⁠​com> wrote:⁠​ Great!   ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍
>>
>> ZjQcmQRYFpfptBannerStart
>>
>> ZjQcmQRYFpfptBannerEnd
>>
>> Congrats! Well deserved!
>>
>>
>>
>> On Wed, Jul 20, 2022 at 15:17 Chamikara Jayalath via dev <
>> dev@beam.apache.org> wrote:
>>
>> Congrats, Steve!
>>
>>
>>
>> On Wed, Jul 20, 2022, 9:16 AM Austin Bennett 
>> wrote:
>>
>> Great!
>>
>>
>>
>> On Wed, Jul 20, 2022 at 10:11 AM Aizhamal Nurmamat kyzy <
>> aizha...@apache.org> wrote:
>>
>> Congrats, Steve!
>>
>>
>>
>> On Wed, Jul 20, 2022 at 3:10 AM Jan Lukavský  wrote:
>>
>> Congrats Steve!
>>
>> On 7/20/22 06:20, Reuven Lax via dev wrote:
>>
>> Welcome Steve!
>>
>>
>>
>> On Tue, Jul 19, 2022 at 1:05 PM Connell O'Callaghan via dev <
>> dev@beam.apache.org> wrote:
>>
>>
>> +++1 Woohoo! Congratulations Steven (and to the BEAM community) on this
>> announcement!!!
>>
>>
>>
>> Thank you Luke for this update
>>
>>
>>
>>
>>
>> On Tue, Jul 19, 2022 at 12:34 PM Robert Burke  wrote:
>>
>> Woohoo! Welcome and congratulations Steven!
>>
>>
>>
>> On Tue, Jul 19, 2022, 12:40 PM Luke Cwik via dev 
>> wrote:
>>
>> Hi all,
>>
>> Please join me and the rest of the Beam PMC in welcoming a new committer:
>> Steven Niemitz (sniemitz@)
>>
>> Steven started contributing to Beam in 2017 fixing bugs and improving
>> logging and usability. Stevens most recent focus has been on performance
>> optimizations within the Java SDK.
>>
>>
>>
>> Considering the time span and number of contributions, the Beam PMC
>> trusts Steven with the responsibilities of a Beam committer. [1]
>>
>>
>> Thank you Steven! And we are looking to see more of your contributions!
>>
>>
>>
>> Luke, on behalf of the Apache Beam PMC
>>
>> [1]
>> https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer
>> 
>>
>> *As a recipient of an email from Talend, your contact personal data will
>> be on our systems. Please see our privacy notice.
>> *
>>
>>
>>


[RFC] State & Timers API Design for Go SDK

2022-07-28 Thread Ritesh Ghorse via dev
Hey everyone,

Danny  and I have been working on designing the
state and timers for Go SDK. We wrote a design doc with user-facing API,
execution details, and different alternatives considered. It would be
really helpful if we could get your suggestions/feedback/comments on the
design.

Design Doc:
https://docs.google.com/document/d/1rcKa1Z6orDDFr1l8t6NA1eLl6zanQbYAEiAqk39NQUU/edit?usp=sharing

Thanks!
Ritesh Ghorse


Oracle Database Connection Pool creation from Beam

2022-07-28 Thread Koka, Deepthi via dev
Hi Team,

We have an issue with the Oracle connections being used up and we have tried to 
implement a pooled data source using PooledDataSourceFactory, somehow we are 
ending up with "Invalid Universal Connection Pool Configuration: 
oracle.ucp.UniversalConnectionPoolException: Universal Connection Pool already 
exists in the Universal Connection Pool Manager.

Can you please suggest us a standard way of using Pooled data sources in Apache 
beam?

Regards,
Deepthi.

--
This email and any files transmitted with it are confidential and intended 
solely for the use of the addressee. If you are not the intended addressee, 
then you have received this email in error and any use, dissemination, 
forwarding, printing, or copying of this email is strictly prohibited. Please 
notify us immediately of your unintended receipt by reply and then delete this 
email and your reply. Tyson Foods, Inc. and its subsidiaries and affiliates 
will not be held liable to any person resulting from the unintended or 
unauthorized use of any information contained in this email or as a result of 
any additions or deletions of information originally contained in this email.


Re: Checkpoints timing out upgrading from Beam version 2.29 with Flink 1.12 to Beam 2.38 and Flink 1.14

2022-07-28 Thread Jan Lukavský

Hi Sandeep,

looking into the code, can you please elaborate on how the reading 
thread holds the lock for ever? From what I understand from the code the 
lock is released after each call to reader.advance(). Therefore the 
checkpoint should not be blocked "for ever". Am I missing something? 
Could you maybe take thread dump and send dumps of those two threads?


Thanks,

 Jan

On 7/27/22 15:58, John Casey via dev wrote:
Would it be possible to recreate the experiments to try and isolate 
variables? Right now the 3 cases change both beam and flink versions.




On Tue, Jul 26, 2022 at 11:35 PM Kenneth Knowles  wrote:

Bumping this and adding +John Casey
 who knows about KafkaIO and
unbounded sources, though probably less about the FlinkRunner. It
seems you have isolated it to the Flink translation logic. I'm not
sure who would be the best expert to evaluate if that logic is
still OK.

Kenn

On Wed, Jun 29, 2022 at 11:07 AM Kathula, Sandeep
 wrote:

Hi,

   We have a stateless application which

 1. Reads from kafka
 2. Doing some stateless transformations by reading from in
memory databases and updating the records
 3. Writing back to Kafka.

*With Beam 2.23 and Flink 1.9, we are seeing checkpoints are
working fine (it takes max 1 min).*

**

*With Beam 2.29 and Flink 1.12, we are seeing checkpoints
taking longer time (it takes max 6-7 min sometimes)*

**

*With Beam 2.38 and Flink 1.14, we are seeing checkpoints
timing out after 10 minutes.*

I am checking Beam code and after some logging and analysis
found the problem is at

https://github.com/apache/beam/blob/release-2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L287-L307

We are still using the old API to read from Kafka and not yet
using KafkaIO based on SplittableDoFn.

There are two threads

 1. Legacy source thread reading from kafka and doing entire
processing.
 2. Thread which emits watermark on timer

https://github.com/apache/beam/blob/release-2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L454-L474

Both these code blocks are in synchronized block waiting for
same checkpoint lock. Under heavy load, the thread reading
from kafka is running for ever in the while loop and  the
thread emitting the watermarks is waiting for ever to get the
lock not emitting the watermarks and the checkpoint times out.

Is it a known issue and do we have any solution here? For now
we are putting Thread.sleep(1) once for every 10 sec after the
synchronized block so that the thread emitting the watermarks
can be unblocked and run.

One of my colleagues tried to follow up on this (attaching the
previous email here) but we didn’t get any reply. Any help on
this would be appreciated.

Thanks,

Sandeep


Beam High Priority Issue Report

2022-07-28 Thread beamactions
This is your daily summary of Beam's current high priority issues that may need 
attention.

See https://beam.apache.org/contribute/issue-priorities for the meaning and 
expectations around issue priorities.

Unassigned P1 Issues:

https://github.com/apache/beam/issues/22440 [Bug]: Python Batch Dataflow 
SideInput LoadTests failing
https://github.com/apache/beam/issues/22401 [Bug]: BigQueryIO getFailedInserts 
fails when using Storage APIs 
https://github.com/apache/beam/issues/22321 
PortableRunnerTestWithExternalEnv.test_pardo_large_input is regularly failing 
on jenkins
https://github.com/apache/beam/issues/22303 [Task]: Add tests to Kafka SDF and 
fix known and discovered issues
https://github.com/apache/beam/issues/22299 [Bug]: JDBCIO Write freeze at 
getConnection() in WriteFn
https://github.com/apache/beam/issues/22188 BigQuery Storage API sink sometimes 
gets stuck outputting to an invalid timestamp
https://github.com/apache/beam/issues/21794 Dataflow runner creates a new timer 
whenever the output timestamp is change
https://github.com/apache/beam/issues/21713 404s in BigQueryIO don't get output 
to Failed Inserts PCollection
https://github.com/apache/beam/issues/21704 beam_PostCommit_Java_DataflowV2 
failures parent bug
https://github.com/apache/beam/issues/21703 pubsublite.ReadWriteIT failing in 
beam_PostCommit_Java_DataflowV1 and V2
https://github.com/apache/beam/issues/21702 SpannerWriteIT failing in beam 
PostCommit Java V1
https://github.com/apache/beam/issues/21701 beam_PostCommit_Java_DataflowV1 
failing with a variety of flakes and errors
https://github.com/apache/beam/issues/21700 
--dataflowServiceOptions=use_runner_v2 is broken
https://github.com/apache/beam/issues/21696 Flink Tests failure :  
java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.beam.runners.core.construction.SerializablePipelineOptions 
https://github.com/apache/beam/issues/21695 DataflowPipelineResult does not 
raise exception for unsuccessful states.
https://github.com/apache/beam/issues/21694 BigQuery Storage API insert with 
writeResult retry and write to error table
https://github.com/apache/beam/issues/21480 flake: 
FlinkRunnerTest.testEnsureStdoutStdErrIsRestored
https://github.com/apache/beam/issues/21472 Dataflow streaming tests failing 
new AfterSynchronizedProcessingTime test
https://github.com/apache/beam/issues/21471 Flakes: Failed to load cache entry
https://github.com/apache/beam/issues/21470 Test flake: test_split_half_sdf
https://github.com/apache/beam/issues/21469 beam_PostCommit_XVR_Flink flaky: 
Connection refused
https://github.com/apache/beam/issues/21468 
beam_PostCommit_Python_Examples_Dataflow failing
https://github.com/apache/beam/issues/21467 GBK and CoGBK streaming Java load 
tests failing
https://github.com/apache/beam/issues/21465 Kafka commit offset drop data on 
failure for runners that have non-checkpointing shuffle
https://github.com/apache/beam/issues/21463 NPE in Flink Portable 
ValidatesRunner streaming suite
https://github.com/apache/beam/issues/21462 Flake in 
org.apache.beam.sdk.io.mqtt.MqttIOTest.testReadObject: Address already in use
https://github.com/apache/beam/issues/21271 pubsublite.ReadWriteIT flaky in 
beam_PostCommit_Java_DataflowV2  
https://github.com/apache/beam/issues/21270 
org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testWindowedCombineGloballyAsSingletonView
 flaky on Dataflow Runner V2
https://github.com/apache/beam/issues/21268 Race between member variable being 
accessed due to leaking uninitialized state via OutboundObserverFactory
https://github.com/apache/beam/issues/21267 WriteToBigQuery submits a duplicate 
BQ load job if a 503 error code is returned from googleapi
https://github.com/apache/beam/issues/21266 
org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful
 is flaky in Java ValidatesRunner Flink suite.
https://github.com/apache/beam/issues/21265 
apache_beam.runners.portability.fn_api_runner.translations_test.TranslationsTest.test_run_packable_combine_globally
 'apache_beam.coders.coder_impl._AbstractIterable' object is not reversible
https://github.com/apache/beam/issues/21263 (Broken Pipe induced) Bricked 
Dataflow Pipeline 
https://github.com/apache/beam/issues/21262 Python AfterAny, AfterAll do not 
follow spec
https://github.com/apache/beam/issues/21261 
org.apache.beam.runners.dataflow.worker.fn.logging.BeamFnLoggingServiceTest.testMultipleClientsFailingIsHandledGracefullyByServer
 is flaky
https://github.com/apache/beam/issues/21260 Python DirectRunner does not emit 
data at GC time
https://github.com/apache/beam/issues/21257 Either Create or DirectRunner fails 
to produce all elements to the following transform
https://github.com/apache/beam/issues/21123 Multiple jobs running on Flink 
session cluster reuse the persistent Python environment.
https://github.com/apache/beam/issues/21121 

Output after Pipeline replaceAll

2022-07-28 Thread Moritz Mack
Hi all,

Wondering if somebody could help and shed some lights on the behavior of 
Pipeline.replaceAll, particularly the outputs to expect after the replacement.
I’m currently looking into supporting VR tests for SparkRunner in streaming 
mode [1]. Unfortunately, I didn’t succeed replacing (wrapping) the unbounded 
Create.Values used as test input into an unbounded source in a way that the 
node’s output would be UNBOUNDED. After the replacement the output is actually 
still the original one.
Is this expected? What would be the recommended way to achieve this?

Below some code to explain further [2].
Also, related, [3] is a tiny PR to fix a broken assertion in 
PipelineTest.testReplaceAll().

Thanks,
Moritz

pipeline.apply("boundedToUnbounded", Create.of(0L));

pipeline.replaceAll(
ImmutableList.of(
PTransformOverride.of(
application -> application.getTransform() instanceof 
Create.Values,
// Replacement is 
GenerateSequence.from(Iterables.getOnlyElement(elements)))
new ValuesToUnboundedSequenceOverride(;

pipeline.traverseTopologically(
new PipelineVisitor.Defaults() {
  @Override
  public CompositeBehavior enterCompositeTransform(Node node) {
if (node.getFullName().equals("boundedToUnbounded")) {
  assertThat(node.getTransform(), 
Matchers.instanceOf(GenerateSequence.class));

  // FIXME Node still contains the original BOUNDED output. But why?
  PCollection output = 
Iterables.getOnlyElement(node.getOutputs().values());
  assertThat(output.isBounded(), 
Matchers.equalTo(PCollection.IsBounded.UNBOUNDED));
}
return CompositeBehavior.ENTER_TRANSFORM;
  }
});


[1] https://github.com/apache/beam/pull/22473
[2] 
https://github.com/apache/beam/compare/master...mosche:beam:BoundedToUnboundedReplaceAll
[3] https://github.com/apache/beam/pull/22485


As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice.