Re: Join a meeting to help coordinate implementing a Dask Runner for Beam
I wanted to share that Ryan gave a presentation about his (and Charles') work on Pangeo Forge at Scipy 2022 (in Austin just before Beam Summit!), with a couple mentions of their transition to Beam [1]. There were also a couple of other talks about Pangeo [2,3] with some Beam/xarray-beam references in there. [1] https://www.youtube.com/watch?v=sY20UpYCAEE=PLYx7XA2nY5Gde0WF1yswQw5InhmSNED8o=9 [2] https://www.youtube.com/watch?v=7niNfs3ZpfQ=PLYx7XA2nY5Gfb0tQyezb4Gsf1nVsy86zt=2 [3] https://www.youtube.com/watch?v=ftlgOESINvo=PLYx7XA2nY5Gfb0tQyezb4Gsf1nVsy86zt=3 On Tue, Jun 21, 2022 at 9:29 AM Ahmet Altay wrote: > Were you able to meet? If yes, I would be very interested in a summary if > someone would like to share that :) > > On Mon, Jun 13, 2022 at 9:16 AM Pablo Estrada wrote: > >> Also added my availability... please do invite me as well : ) >> -P. >> >> On Mon, Jun 13, 2022 at 6:57 AM Kenneth Knowles wrote: >> >>> I would love to try to join any meetings if you add me. My calendar is >>> too chaotic to be useful on the when2meet :-) but I can often move things >>> around. >>> >>> Kenn >>> >>> On Wed, Jun 8, 2022 at 2:50 PM Brian Hulette >>> wrote: >>> Thanks for reaching out, Ryan, this sounds really cool. I added my availability to the calendar since I'm interested in this space, but I'm not sure I can offer much help - I don't have any experience building a runner, to date I've worked exclusively on the SDK side of Beam. So I hope some other folks can join as well :) @Pablo Estrada might have some useful insight - he's been working on a spike to build a Ray runner. On Wed, Jun 8, 2022 at 12:53 PM Robert Bradshaw wrote: > This sounds like a great project. Unfortunately I wouldn't be able to > meet next week, but would be happy to meet some other time and if that > doesn't work answer questions over email, etc. Looking forward to a > Dask runner. > > On Wed, Jun 8, 2022 at 9:04 AM Ryan Abernathey > wrote: > > > > Dear Beamer, > > > > Thank you for all of your work on this amazing project. I am new to > Beam and am quite excited about its potential to help with some data > processing challenges in my field of climate science. > > > > Our community is interested in running Beam on Dask Distributed > clusters, which we already know how to deploy. This has been discussed at > https://issues.apache.org/jira/browse/BEAM-5336 and > https://github.com/apache/beam/issues/18962. It seems technically > feasible. > > > > We are trying to organize a meeting next week to kickstart and > coordinate this effort. It would be great if we could entrain some Beam > maintainers into this meeting. If you have interest in this topic and are > available next week, please share your availability here - > https://www.when2meet.com/?15861604-jLnA4 > > > > Alternatively, if you have any guidance or suggestions you wish to > provide by email or GitHub discussion, we welcome your input. > > > > Thanks again for your open source work. > > > > Best, > > Ryan Abernathey > > >
Re: [Cdap IO] SparkReceiverIO SDF vs UnboundedSource for Spark Receivers without offset
On Wed, Aug 3, 2022 at 10:03 AM Chamikara Jayalath wrote: > > > On Wed, Aug 3, 2022 at 8:57 AM Elizaveta Lomteva < > elizaveta.lomt...@akvelon.com> wrote: > >> Hi community! >> >> Our team is working on the SparkReceiverIO connector for Apache Beam. We >> have published SparkReceiverIO.Read PR [1]. >> >> >>- >> >>We are working with the Spark Receiver class [2]. Receiver should >>ideally implement HasOffset interface [3] with the setStartOffset() >>method, so we can start multiple Receivers from some offset for the >>specific Restriction. >>- >> >>Also SplittableDoFn implies presence of the RestrictionTracker which >>has .tryClaim(offset) method. So there should be an ability to get >>offset for the current record from the Receiver. >> >> >> Let’s imagine we are dealing with a simple Receiver, that doesn’t >> implement HasOffset interface [3], and we are thinking about using the >> SDF approach for this case as well. There are some unresolved questions: >> >>1. >> >>Since we don’t have the ability to start multiple Receivers from >>different offsets, we need to start only one main Receiver [0 ; >>+inf). What is the best place for doing this? (Currently, it’s the >>constructor of the SDF). >> >> > When you provide the initial restriction, you can simply provide a tracker > (OffsetRangeTracker) that only contains one offset, so the range will be > [0, 1]. > Sorry, should be [0, 1). Also, probably makes sense to develop an UnsplittableRangeTracker that can also be used in other contexts. We have something similar for Python with the old RangeTracker interface. https://github.com/apache/beam/blob/8e217ea0d1f383ef5033ef507b14d01edf9c67e6/sdks/python/apache_beam/io/range_trackers.py#L301 > Within the splitRestriction() method, return the same restriction. > > This will result in a SDF that will not be split either through initial > splitting or dynamic work rebalancing. > > This will not be very efficient, but hopefully this will generalize your > solution for all receivers, whether they support offsets or not. > > >> >>1. >>2. >> >>All records coming from the Receiver should be stored in some buffer. >>Since SDF objects are serialized, what is the best way to provide a >>link to the shared buffer for all of them? >> >> >>1. >>2. >> >>How to correctly stop our main Receiver? (also serialization problem) >>3. >> >>There are no tangible benefits from using SDF - we can’t parallelize >>reading, because there will be a single-thread Receiver limitation. >>4. >> >>What if we are dealing with the Receiver that doesn’t have the >>ability to determine offset for the current record? >> >> > Could you clarify why these questions could be addressed for > UnboundedSource but not SDF. AFAICT these should be addressed with either > source framework (probably in a similar way). For example, both types of > source objects get serialized by runners. > > Thanks, > Cham > > >> >>1. >> >> >> A possible solution that we see is to use the UnboundedSource approach, >> as we did earlier in Read from Spark Receiver via the UnboundedSource PoC >> branch [4]. It looks like we can resolve all the questions above by >> implementing it. But the UnboundedSource is deprecated. >> >> Could someone give us advice on how can we manage working with Receivers >> without offset in our case? >> >> Any ideas or comments would be greatly appreciated. >> >> Thanks for your attention to it! >> >> Elizaveta >> >> [1] [BEAM-14378] [CdapIO] SparkReceiverIO Read via SDF #17828 – >> https://github.com/apache/beam/pull/17828 >> >> [2] Spark Streaming Custom Receivers – >> https://spark.apache.org/docs/latest/streaming-custom-receivers.html >> >> [3] HasOffset interface – >> https://github.com/apache/beam/blob/0581c49575eeba9df8be2a166c6923209fa8f7a5/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java >> >> [4] SparkReceiverIO: Read via UnboundedSource – >> https://github.com/apache/beam/pull/17360/files#diff-795caf376b2257e6669096a9048490d4935aff573e636617eb431d379e330db0 >> >> >>
Re: [Cdap IO] SparkReceiverIO SDF vs UnboundedSource for Spark Receivers without offset
On Wed, Aug 3, 2022 at 8:57 AM Elizaveta Lomteva < elizaveta.lomt...@akvelon.com> wrote: > Hi community! > > Our team is working on the SparkReceiverIO connector for Apache Beam. We > have published SparkReceiverIO.Read PR [1]. > > >- > >We are working with the Spark Receiver class [2]. Receiver should >ideally implement HasOffset interface [3] with the setStartOffset() >method, so we can start multiple Receivers from some offset for the >specific Restriction. >- > >Also SplittableDoFn implies presence of the RestrictionTracker which >has .tryClaim(offset) method. So there should be an ability to get >offset for the current record from the Receiver. > > > Let’s imagine we are dealing with a simple Receiver, that doesn’t > implement HasOffset interface [3], and we are thinking about using the SDF > approach for this case as well. There are some unresolved questions: > >1. > >Since we don’t have the ability to start multiple Receivers from >different offsets, we need to start only one main Receiver [0 ; +inf). >What is the best place for doing this? (Currently, it’s the constructor of >the SDF). > > When you provide the initial restriction, you can simply provide a tracker (OffsetRangeTracker) that only contains one offset, so the range will be [0, 1]. Within the splitRestriction() method, return the same restriction. This will result in a SDF that will not be split either through initial splitting or dynamic work rebalancing. This will not be very efficient, but hopefully this will generalize your solution for all receivers, whether they support offsets or not. > >1. >2. > >All records coming from the Receiver should be stored in some buffer. >Since SDF objects are serialized, what is the best way to provide a >link to the shared buffer for all of them? > > >1. >2. > >How to correctly stop our main Receiver? (also serialization problem) >3. > >There are no tangible benefits from using SDF - we can’t parallelize >reading, because there will be a single-thread Receiver limitation. >4. > >What if we are dealing with the Receiver that doesn’t have the ability >to determine offset for the current record? > > Could you clarify why these questions could be addressed for UnboundedSource but not SDF. AFAICT these should be addressed with either source framework (probably in a similar way). For example, both types of source objects get serialized by runners. Thanks, Cham > >1. > > > A possible solution that we see is to use the UnboundedSource approach, as > we did earlier in Read from Spark Receiver via the UnboundedSource PoC > branch [4]. It looks like we can resolve all the questions above by > implementing it. But the UnboundedSource is deprecated. > > Could someone give us advice on how can we manage working with Receivers > without offset in our case? > > Any ideas or comments would be greatly appreciated. > > Thanks for your attention to it! > > Elizaveta > > [1] [BEAM-14378] [CdapIO] SparkReceiverIO Read via SDF #17828 – > https://github.com/apache/beam/pull/17828 > > [2] Spark Streaming Custom Receivers – > https://spark.apache.org/docs/latest/streaming-custom-receivers.html > > [3] HasOffset interface – > https://github.com/apache/beam/blob/0581c49575eeba9df8be2a166c6923209fa8f7a5/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java > > [4] SparkReceiverIO: Read via UnboundedSource – > https://github.com/apache/beam/pull/17360/files#diff-795caf376b2257e6669096a9048490d4935aff573e636617eb431d379e330db0 > > >
[Cdap IO] SparkReceiverIO SDF vs UnboundedSource for Spark Receivers without offset
Hi community! Our team is working on the SparkReceiverIO connector for Apache Beam. We have published SparkReceiverIO.Read PR [1]. * We are working with the Spark Receiver class [2]. Receiver should ideally implement HasOffset interface [3] with the setStartOffset() method, so we can start multiple Receivers from some offset for the specific Restriction. * Also SplittableDoFn implies presence of the RestrictionTracker which has .tryClaim(offset) method. So there should be an ability to get offset for the current record from the Receiver. Let’s imagine we are dealing with a simple Receiver, that doesn’t implement HasOffset interface [3], and we are thinking about using the SDF approach for this case as well. There are some unresolved questions: 1. Since we don’t have the ability to start multiple Receivers from different offsets, we need to start only one main Receiver [0 ; +inf). What is the best place for doing this? (Currently, it’s the constructor of the SDF). 2. All records coming from the Receiver should be stored in some buffer. Since SDF objects are serialized, what is the best way to provide a link to the shared buffer for all of them? 3. How to correctly stop our main Receiver? (also serialization problem) 4. There are no tangible benefits from using SDF - we can’t parallelize reading, because there will be a single-thread Receiver limitation. 5. What if we are dealing with the Receiver that doesn’t have the ability to determine offset for the current record? A possible solution that we see is to use the UnboundedSource approach, as we did earlier in Read from Spark Receiver via the UnboundedSource PoC branch [4]. It looks like we can resolve all the questions above by implementing it. But the UnboundedSource is deprecated. Could someone give us advice on how can we manage working with Receivers without offset in our case? Any ideas or comments would be greatly appreciated. Thanks for your attention to it! Elizaveta [1] [BEAM-14378] [CdapIO] SparkReceiverIO Read via SDF #17828 – https://github.com/apache/beam/pull/17828 [2] Spark Streaming Custom Receivers – https://spark.apache.org/docs/latest/streaming-custom-receivers.html [3] HasOffset interface – https://github.com/apache/beam/blob/0581c49575eeba9df8be2a166c6923209fa8f7a5/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java [4] SparkReceiverIO: Read via UnboundedSource – https://github.com/apache/beam/pull/17360/files#diff-795caf376b2257e6669096a9048490d4935aff573e636617eb431d379e330db0
Beam High Priority Issue Report
This is your daily summary of Beam's current high priority issues that may need attention. See https://beam.apache.org/contribute/issue-priorities for the meaning and expectations around issue priorities. Unassigned P1 Issues: https://github.com/apache/beam/issues/22543 [Bug]: ClassCastException when using custom DynamicDestination in BigQueryIO.Write https://github.com/apache/beam/issues/22440 [Bug]: Python Batch Dataflow SideInput LoadTests failing https://github.com/apache/beam/issues/22401 [Bug]: BigQueryIO getFailedInserts fails when using Storage APIs https://github.com/apache/beam/issues/22321 PortableRunnerTestWithExternalEnv.test_pardo_large_input is regularly failing on jenkins https://github.com/apache/beam/issues/22303 [Task]: Add tests to Kafka SDF and fix known and discovered issues https://github.com/apache/beam/issues/22299 [Bug]: JDBCIO Write freeze at getConnection() in WriteFn https://github.com/apache/beam/issues/22188 BigQuery Storage API sink sometimes gets stuck outputting to an invalid timestamp https://github.com/apache/beam/issues/21794 Dataflow runner creates a new timer whenever the output timestamp is change https://github.com/apache/beam/issues/21713 404s in BigQueryIO don't get output to Failed Inserts PCollection https://github.com/apache/beam/issues/21704 beam_PostCommit_Java_DataflowV2 failures parent bug https://github.com/apache/beam/issues/21703 pubsublite.ReadWriteIT failing in beam_PostCommit_Java_DataflowV1 and V2 https://github.com/apache/beam/issues/21702 SpannerWriteIT failing in beam PostCommit Java V1 https://github.com/apache/beam/issues/21701 beam_PostCommit_Java_DataflowV1 failing with a variety of flakes and errors https://github.com/apache/beam/issues/21700 --dataflowServiceOptions=use_runner_v2 is broken https://github.com/apache/beam/issues/21696 Flink Tests failure : java.lang.NoClassDefFoundError: Could not initialize class org.apache.beam.runners.core.construction.SerializablePipelineOptions https://github.com/apache/beam/issues/21695 DataflowPipelineResult does not raise exception for unsuccessful states. https://github.com/apache/beam/issues/21694 BigQuery Storage API insert with writeResult retry and write to error table https://github.com/apache/beam/issues/21480 flake: FlinkRunnerTest.testEnsureStdoutStdErrIsRestored https://github.com/apache/beam/issues/21472 Dataflow streaming tests failing new AfterSynchronizedProcessingTime test https://github.com/apache/beam/issues/21471 Flakes: Failed to load cache entry https://github.com/apache/beam/issues/21470 Test flake: test_split_half_sdf https://github.com/apache/beam/issues/21469 beam_PostCommit_XVR_Flink flaky: Connection refused https://github.com/apache/beam/issues/21468 beam_PostCommit_Python_Examples_Dataflow failing https://github.com/apache/beam/issues/21467 GBK and CoGBK streaming Java load tests failing https://github.com/apache/beam/issues/21465 Kafka commit offset drop data on failure for runners that have non-checkpointing shuffle https://github.com/apache/beam/issues/21463 NPE in Flink Portable ValidatesRunner streaming suite https://github.com/apache/beam/issues/21462 Flake in org.apache.beam.sdk.io.mqtt.MqttIOTest.testReadObject: Address already in use https://github.com/apache/beam/issues/21271 pubsublite.ReadWriteIT flaky in beam_PostCommit_Java_DataflowV2 https://github.com/apache/beam/issues/21270 org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testWindowedCombineGloballyAsSingletonView flaky on Dataflow Runner V2 https://github.com/apache/beam/issues/21268 Race between member variable being accessed due to leaking uninitialized state via OutboundObserverFactory https://github.com/apache/beam/issues/21267 WriteToBigQuery submits a duplicate BQ load job if a 503 error code is returned from googleapi https://github.com/apache/beam/issues/21266 org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful is flaky in Java ValidatesRunner Flink suite. https://github.com/apache/beam/issues/21265 apache_beam.runners.portability.fn_api_runner.translations_test.TranslationsTest.test_run_packable_combine_globally 'apache_beam.coders.coder_impl._AbstractIterable' object is not reversible https://github.com/apache/beam/issues/21263 (Broken Pipe induced) Bricked Dataflow Pipeline https://github.com/apache/beam/issues/21262 Python AfterAny, AfterAll do not follow spec https://github.com/apache/beam/issues/21261 org.apache.beam.runners.dataflow.worker.fn.logging.BeamFnLoggingServiceTest.testMultipleClientsFailingIsHandledGracefullyByServer is flaky https://github.com/apache/beam/issues/21260 Python DirectRunner does not emit data at GC time https://github.com/apache/beam/issues/21257 Either Create or DirectRunner fails to produce all elements to the following transform https://github.com/apache/beam/issues/21123 Multiple jobs running on Flink session cluster reuse the persistent