Re: Join a meeting to help coordinate implementing a Dask Runner for Beam

2022-08-03 Thread Brian Hulette via dev
I wanted to share that Ryan gave a presentation about his (and Charles')
work on Pangeo Forge at Scipy 2022 (in Austin just before Beam Summit!),
with a couple mentions of their transition to Beam [1]. There were also a
couple of other talks about Pangeo [2,3] with some Beam/xarray-beam
references in there.

[1]
https://www.youtube.com/watch?v=sY20UpYCAEE=PLYx7XA2nY5Gde0WF1yswQw5InhmSNED8o=9
[2]
https://www.youtube.com/watch?v=7niNfs3ZpfQ=PLYx7XA2nY5Gfb0tQyezb4Gsf1nVsy86zt=2
[3]
https://www.youtube.com/watch?v=ftlgOESINvo=PLYx7XA2nY5Gfb0tQyezb4Gsf1nVsy86zt=3

On Tue, Jun 21, 2022 at 9:29 AM Ahmet Altay  wrote:

> Were you able to meet? If yes, I would be very interested in a summary if
> someone would like to share that :)
>
> On Mon, Jun 13, 2022 at 9:16 AM Pablo Estrada  wrote:
>
>> Also added my availability... please do invite me as well : )
>> -P.
>>
>> On Mon, Jun 13, 2022 at 6:57 AM Kenneth Knowles  wrote:
>>
>>> I would love to try to join any meetings if you add me. My calendar is
>>> too chaotic to be useful on the when2meet :-) but I can often move things
>>> around.
>>>
>>> Kenn
>>>
>>> On Wed, Jun 8, 2022 at 2:50 PM Brian Hulette 
>>> wrote:
>>>
 Thanks for reaching out, Ryan, this sounds really cool. I added my
 availability to the calendar since I'm interested in this space, but I'm
 not sure I can offer much help - I don't have any experience building a
 runner, to date I've worked exclusively on the SDK side of Beam. So I hope
 some other folks can join as well :)

 @Pablo Estrada  might have some useful insight -
 he's been working on a spike to build a Ray runner.


 On Wed, Jun 8, 2022 at 12:53 PM Robert Bradshaw 
 wrote:

> This sounds like a great project. Unfortunately I wouldn't be able to
> meet next week, but would be happy to meet some other time and if that
> doesn't work answer questions over email, etc. Looking forward to a
> Dask runner.
>
> On Wed, Jun 8, 2022 at 9:04 AM Ryan Abernathey
>  wrote:
> >
> > Dear Beamer,
> >
> > Thank you for all of your work on this amazing project. I am new to
> Beam and am quite excited about its potential to help with some data
> processing challenges in my field of climate science.
> >
> > Our community is interested in running Beam on Dask Distributed
> clusters, which we already know how to deploy. This has been discussed at
> https://issues.apache.org/jira/browse/BEAM-5336 and
> https://github.com/apache/beam/issues/18962. It seems technically
> feasible.
> >
> > We are trying to organize a meeting next week to kickstart and
> coordinate this effort. It would be great if we could entrain some Beam
> maintainers into this meeting. If you have interest in this topic and are
> available next week, please share your availability here -
> https://www.when2meet.com/?15861604-jLnA4
> >
> > Alternatively, if you have any guidance or suggestions you wish to
> provide by email or GitHub discussion, we welcome your input.
> >
> > Thanks again for your open source work.
> >
> > Best,
> > Ryan Abernathey
> >
>



Re: [Cdap IO] SparkReceiverIO SDF vs UnboundedSource for Spark Receivers without offset

2022-08-03 Thread Chamikara Jayalath via dev
On Wed, Aug 3, 2022 at 10:03 AM Chamikara Jayalath 
wrote:

>
>
> On Wed, Aug 3, 2022 at 8:57 AM Elizaveta Lomteva <
> elizaveta.lomt...@akvelon.com> wrote:
>
>> Hi community!
>>
>> Our team is working on the SparkReceiverIO connector for Apache Beam. We
>> have published SparkReceiverIO.Read PR [1].
>>
>>
>>-
>>
>>We are working with the Spark Receiver class [2]. Receiver should
>>ideally implement HasOffset interface [3] with the setStartOffset()
>>method, so we can start multiple Receivers from some offset for the
>>specific Restriction.
>>-
>>
>>Also SplittableDoFn implies presence of the RestrictionTracker which
>>has .tryClaim(offset) method. So there should be an ability to get
>>offset for the current record from the Receiver.
>>
>>
>> Let’s imagine we are dealing with a simple Receiver, that doesn’t
>> implement HasOffset interface [3], and we are thinking about using the
>> SDF approach for this case as well. There are some unresolved questions:
>>
>>1.
>>
>>Since we don’t have the ability to start multiple Receivers from
>>different offsets, we need to start only one main Receiver [0 ;
>>+inf). What is the best place for doing this? (Currently, it’s the
>>constructor of the SDF).
>>
>>
> When you provide the initial restriction, you can simply provide a tracker
> (OffsetRangeTracker) that only contains one offset, so the range will be
> [0, 1].
>

Sorry, should be [0, 1).
Also, probably makes sense to develop an UnsplittableRangeTracker that can
also be used in other contexts. We have something similar for Python with
the old RangeTracker interface.
https://github.com/apache/beam/blob/8e217ea0d1f383ef5033ef507b14d01edf9c67e6/sdks/python/apache_beam/io/range_trackers.py#L301



> Within the splitRestriction() method, return the same restriction.
>
> This will result in a SDF that will not be split either through initial
> splitting or dynamic work rebalancing.
>
> This will not be very efficient, but hopefully this will generalize your
> solution for all receivers, whether they support offsets or not.
>
>
>>
>>1.
>>2.
>>
>>All records coming from the Receiver should be stored in some buffer.
>>Since SDF objects are serialized, what is the best way to provide a
>>link to the shared buffer for all of them?
>>
>>
>>1.
>>2.
>>
>>How to correctly stop our main Receiver? (also serialization problem)
>>3.
>>
>>There are no tangible benefits from using SDF - we can’t parallelize
>>reading, because there will be a single-thread Receiver limitation.
>>4.
>>
>>What if we are dealing with the Receiver that doesn’t have the
>>ability to determine offset for the current record?
>>
>>
> Could you clarify why these questions could be addressed for
> UnboundedSource but not SDF. AFAICT these should be addressed with either
> source framework (probably in a similar way). For example, both types of
> source objects get serialized by runners.
>
> Thanks,
> Cham
>
>
>>
>>1.
>>
>>
>> A possible solution that we see is to use the UnboundedSource approach,
>> as we did earlier in Read from Spark Receiver via the UnboundedSource PoC
>> branch [4]. It looks like we can resolve all the questions above by
>> implementing it. But the UnboundedSource is deprecated.
>>
>> Could someone give us advice on how can we manage working with Receivers
>> without offset in our case?
>>
>> Any ideas or comments would be greatly appreciated.
>>
>> Thanks for your attention to it!
>>
>> Elizaveta
>>
>> [1] [BEAM-14378] [CdapIO] SparkReceiverIO Read via SDF #17828 –
>> https://github.com/apache/beam/pull/17828
>>
>> [2] Spark Streaming Custom Receivers –
>> https://spark.apache.org/docs/latest/streaming-custom-receivers.html
>>
>> [3] HasOffset interface –
>> https://github.com/apache/beam/blob/0581c49575eeba9df8be2a166c6923209fa8f7a5/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java
>>
>> [4] SparkReceiverIO: Read via UnboundedSource –
>> https://github.com/apache/beam/pull/17360/files#diff-795caf376b2257e6669096a9048490d4935aff573e636617eb431d379e330db0
>>
>>
>>


Re: [Cdap IO] SparkReceiverIO SDF vs UnboundedSource for Spark Receivers without offset

2022-08-03 Thread Chamikara Jayalath via dev
On Wed, Aug 3, 2022 at 8:57 AM Elizaveta Lomteva <
elizaveta.lomt...@akvelon.com> wrote:

> Hi community!
>
> Our team is working on the SparkReceiverIO connector for Apache Beam. We
> have published SparkReceiverIO.Read PR [1].
>
>
>-
>
>We are working with the Spark Receiver class [2]. Receiver should
>ideally implement HasOffset interface [3] with the setStartOffset()
>method, so we can start multiple Receivers from some offset for the
>specific Restriction.
>-
>
>Also SplittableDoFn implies presence of the RestrictionTracker which
>has .tryClaim(offset) method. So there should be an ability to get
>offset for the current record from the Receiver.
>
>
> Let’s imagine we are dealing with a simple Receiver, that doesn’t
> implement HasOffset interface [3], and we are thinking about using the SDF
> approach for this case as well. There are some unresolved questions:
>
>1.
>
>Since we don’t have the ability to start multiple Receivers from
>different offsets, we need to start only one main Receiver [0 ; +inf).
>What is the best place for doing this? (Currently, it’s the constructor of
>the SDF).
>
>
When you provide the initial restriction, you can simply provide a tracker
(OffsetRangeTracker) that only contains one offset, so the range will be
[0, 1].
Within the splitRestriction() method, return the same restriction.

This will result in a SDF that will not be split either through initial
splitting or dynamic work rebalancing.

This will not be very efficient, but hopefully this will generalize your
solution for all receivers, whether they support offsets or not.


>
>1.
>2.
>
>All records coming from the Receiver should be stored in some buffer.
>Since SDF objects are serialized, what is the best way to provide a
>link to the shared buffer for all of them?
>
>
>1.
>2.
>
>How to correctly stop our main Receiver? (also serialization problem)
>3.
>
>There are no tangible benefits from using SDF - we can’t parallelize
>reading, because there will be a single-thread Receiver limitation.
>4.
>
>What if we are dealing with the Receiver that doesn’t have the ability
>to determine offset for the current record?
>
>
Could you clarify why these questions could be addressed for
UnboundedSource but not SDF. AFAICT these should be addressed with either
source framework (probably in a similar way). For example, both types of
source objects get serialized by runners.

Thanks,
Cham


>
>1.
>
>
> A possible solution that we see is to use the UnboundedSource approach, as
> we did earlier in Read from Spark Receiver via the UnboundedSource PoC
> branch [4]. It looks like we can resolve all the questions above by
> implementing it. But the UnboundedSource is deprecated.
>
> Could someone give us advice on how can we manage working with Receivers
> without offset in our case?
>
> Any ideas or comments would be greatly appreciated.
>
> Thanks for your attention to it!
>
> Elizaveta
>
> [1] [BEAM-14378] [CdapIO] SparkReceiverIO Read via SDF #17828 –
> https://github.com/apache/beam/pull/17828
>
> [2] Spark Streaming Custom Receivers –
> https://spark.apache.org/docs/latest/streaming-custom-receivers.html
>
> [3] HasOffset interface –
> https://github.com/apache/beam/blob/0581c49575eeba9df8be2a166c6923209fa8f7a5/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java
>
> [4] SparkReceiverIO: Read via UnboundedSource –
> https://github.com/apache/beam/pull/17360/files#diff-795caf376b2257e6669096a9048490d4935aff573e636617eb431d379e330db0
>
>
>


[Cdap IO] SparkReceiverIO SDF vs UnboundedSource for Spark Receivers without offset

2022-08-03 Thread Elizaveta Lomteva
Hi community!

Our team is working on the SparkReceiverIO connector for Apache Beam. We have 
published SparkReceiverIO.Read PR [1].


  *   We are working with the Spark Receiver class [2]. Receiver should ideally 
implement HasOffset interface [3] with the setStartOffset() method, so we can 
start multiple Receivers from some offset for the specific Restriction.

  *   Also SplittableDoFn implies presence of the RestrictionTracker which has 
.tryClaim(offset) method. So there should be an ability to get offset for the 
current record from the Receiver.


Let’s imagine we are dealing with a simple Receiver, that doesn’t implement 
HasOffset interface [3], and we are thinking about using the SDF approach for 
this case as well. There are some unresolved questions:

  1.  Since we don’t have the ability to start multiple Receivers from 
different offsets, we need to start only one main Receiver [0 ; +inf). What is 
the best place for doing this? (Currently, it’s the constructor of the SDF).

  2.  All records coming from the Receiver should be stored in some buffer. 
Since SDF objects are serialized, what is the best way to provide a link to the 
shared buffer for all of them?

  3.  How to correctly stop our main Receiver? (also serialization problem)

  4.  There are no tangible benefits from using SDF - we can’t parallelize 
reading, because there will be a single-thread Receiver limitation.

  5.  What if we are dealing with the Receiver that doesn’t have the ability to 
determine offset for the current record?


A possible solution that we see is to use the UnboundedSource approach, as we 
did earlier in Read from Spark Receiver via the UnboundedSource PoC branch [4]. 
It looks like we can resolve all the questions above by implementing it. But 
the UnboundedSource is deprecated.


Could someone give us advice on how can we manage working with Receivers 
without offset in our case?

Any ideas or comments would be greatly appreciated.


Thanks for your attention to it!

Elizaveta


[1] [BEAM-14378] [CdapIO] SparkReceiverIO Read via SDF #17828 – 
https://github.com/apache/beam/pull/17828

[2] Spark Streaming Custom Receivers – 
https://spark.apache.org/docs/latest/streaming-custom-receivers.html

[3] HasOffset interface – 
https://github.com/apache/beam/blob/0581c49575eeba9df8be2a166c6923209fa8f7a5/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java

[4] SparkReceiverIO: Read via UnboundedSource – 
https://github.com/apache/beam/pull/17360/files#diff-795caf376b2257e6669096a9048490d4935aff573e636617eb431d379e330db0




Beam High Priority Issue Report

2022-08-03 Thread beamactions
This is your daily summary of Beam's current high priority issues that may need 
attention.

See https://beam.apache.org/contribute/issue-priorities for the meaning and 
expectations around issue priorities.

Unassigned P1 Issues:

https://github.com/apache/beam/issues/22543 [Bug]: ClassCastException when 
using custom DynamicDestination in BigQueryIO.Write
https://github.com/apache/beam/issues/22440 [Bug]: Python Batch Dataflow 
SideInput LoadTests failing
https://github.com/apache/beam/issues/22401 [Bug]: BigQueryIO getFailedInserts 
fails when using Storage APIs 
https://github.com/apache/beam/issues/22321 
PortableRunnerTestWithExternalEnv.test_pardo_large_input is regularly failing 
on jenkins
https://github.com/apache/beam/issues/22303 [Task]: Add tests to Kafka SDF and 
fix known and discovered issues
https://github.com/apache/beam/issues/22299 [Bug]: JDBCIO Write freeze at 
getConnection() in WriteFn
https://github.com/apache/beam/issues/22188 BigQuery Storage API sink sometimes 
gets stuck outputting to an invalid timestamp
https://github.com/apache/beam/issues/21794 Dataflow runner creates a new timer 
whenever the output timestamp is change
https://github.com/apache/beam/issues/21713 404s in BigQueryIO don't get output 
to Failed Inserts PCollection
https://github.com/apache/beam/issues/21704 beam_PostCommit_Java_DataflowV2 
failures parent bug
https://github.com/apache/beam/issues/21703 pubsublite.ReadWriteIT failing in 
beam_PostCommit_Java_DataflowV1 and V2
https://github.com/apache/beam/issues/21702 SpannerWriteIT failing in beam 
PostCommit Java V1
https://github.com/apache/beam/issues/21701 beam_PostCommit_Java_DataflowV1 
failing with a variety of flakes and errors
https://github.com/apache/beam/issues/21700 
--dataflowServiceOptions=use_runner_v2 is broken
https://github.com/apache/beam/issues/21696 Flink Tests failure :  
java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.beam.runners.core.construction.SerializablePipelineOptions 
https://github.com/apache/beam/issues/21695 DataflowPipelineResult does not 
raise exception for unsuccessful states.
https://github.com/apache/beam/issues/21694 BigQuery Storage API insert with 
writeResult retry and write to error table
https://github.com/apache/beam/issues/21480 flake: 
FlinkRunnerTest.testEnsureStdoutStdErrIsRestored
https://github.com/apache/beam/issues/21472 Dataflow streaming tests failing 
new AfterSynchronizedProcessingTime test
https://github.com/apache/beam/issues/21471 Flakes: Failed to load cache entry
https://github.com/apache/beam/issues/21470 Test flake: test_split_half_sdf
https://github.com/apache/beam/issues/21469 beam_PostCommit_XVR_Flink flaky: 
Connection refused
https://github.com/apache/beam/issues/21468 
beam_PostCommit_Python_Examples_Dataflow failing
https://github.com/apache/beam/issues/21467 GBK and CoGBK streaming Java load 
tests failing
https://github.com/apache/beam/issues/21465 Kafka commit offset drop data on 
failure for runners that have non-checkpointing shuffle
https://github.com/apache/beam/issues/21463 NPE in Flink Portable 
ValidatesRunner streaming suite
https://github.com/apache/beam/issues/21462 Flake in 
org.apache.beam.sdk.io.mqtt.MqttIOTest.testReadObject: Address already in use
https://github.com/apache/beam/issues/21271 pubsublite.ReadWriteIT flaky in 
beam_PostCommit_Java_DataflowV2  
https://github.com/apache/beam/issues/21270 
org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testWindowedCombineGloballyAsSingletonView
 flaky on Dataflow Runner V2
https://github.com/apache/beam/issues/21268 Race between member variable being 
accessed due to leaking uninitialized state via OutboundObserverFactory
https://github.com/apache/beam/issues/21267 WriteToBigQuery submits a duplicate 
BQ load job if a 503 error code is returned from googleapi
https://github.com/apache/beam/issues/21266 
org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful
 is flaky in Java ValidatesRunner Flink suite.
https://github.com/apache/beam/issues/21265 
apache_beam.runners.portability.fn_api_runner.translations_test.TranslationsTest.test_run_packable_combine_globally
 'apache_beam.coders.coder_impl._AbstractIterable' object is not reversible
https://github.com/apache/beam/issues/21263 (Broken Pipe induced) Bricked 
Dataflow Pipeline 
https://github.com/apache/beam/issues/21262 Python AfterAny, AfterAll do not 
follow spec
https://github.com/apache/beam/issues/21261 
org.apache.beam.runners.dataflow.worker.fn.logging.BeamFnLoggingServiceTest.testMultipleClientsFailingIsHandledGracefullyByServer
 is flaky
https://github.com/apache/beam/issues/21260 Python DirectRunner does not emit 
data at GC time
https://github.com/apache/beam/issues/21257 Either Create or DirectRunner fails 
to produce all elements to the following transform
https://github.com/apache/beam/issues/21123 Multiple jobs running on Flink 
session cluster reuse the persistent